From 159a3b9746e27789202b4ba4d68d99a86ffae20d Mon Sep 17 00:00:00 2001 From: David Holiday Date: Wed, 17 Jul 2024 23:27:20 -0600 Subject: [PATCH 01/10] experiment --- .../kotlin/fhirengine/azure/FHIRFunctions.kt | 9 +++---- .../kotlin/fhirengine/engine/QueueMessage.kt | 24 +++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index b58eda44f50..3c1a17c6d4d 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -101,10 +101,11 @@ class FHIRFunctions( ) { val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) messagesToDispatch.forEach { - queueAccess.sendMessage( - elrTranslationQueueName, - it.serialize() - ) + it.send(queueAccess) +// queueAccess.sendMessage( +// elrTranslationQueueName, +// it.serialize() +// ) } } diff --git a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt index d2e68dbf50c..5dce17c74c7 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt @@ -1,5 +1,11 @@ package gov.cdc.prime.router.fhirengine.engine +import gov.cdc.prime.router.fhirengine.engine.elrConvertQueueName +import gov.cdc.prime.router.fhirengine.engine.elrDestinationFilterQueueName +import gov.cdc.prime.router.fhirengine.engine.elrReceiverFilterQueueName +import gov.cdc.prime.router.fhirengine.engine.elrRoutingQueueName +import gov.cdc.prime.router.fhirengine.engine.elrSendQueueName +import gov.cdc.prime.router.fhirengine.engine.elrTranslationQueueName import com.fasterxml.jackson.annotation.JsonSubTypes import com.fasterxml.jackson.annotation.JsonTypeInfo import com.fasterxml.jackson.annotation.JsonTypeName @@ -13,6 +19,8 @@ import gov.cdc.prime.router.ReportId import gov.cdc.prime.router.Topic import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.Event +import gov.cdc.prime.router.azure.QueueAccess +import gov.cdc.prime.router.fhirengine.azure.FHIRFunctions import java.util.Base64 import java.util.UUID @@ -35,6 +43,22 @@ private const val MESSAGE_SIZE_LIMIT = 64 * 1000 JsonSubTypes.Type(ReportEventQueueMessage::class, name = "report") ) abstract class QueueMessage { + + //abstract fun getClass(): Class + + fun send(queueAccess: QueueAccess): Unit { + + when (this.javaClass) { + FhirRouteQueueMessage::class.java -> { + queueAccess.sendMessage( + elrTranslationQueueName, + serialize() + ) + } + } + + } + fun serialize(): String { val bytes = mapper.writeValueAsBytes(this) check(bytes.size < MESSAGE_SIZE_LIMIT) { "Message is too big for the queue." } From af8c30f62ba2e72bd1abe54b8cf94fe61c9fa055 Mon Sep 17 00:00:00 2001 From: David Holiday Date: Thu, 18 Jul 2024 18:10:16 -0600 Subject: [PATCH 02/10] two failing tests but I think the tests are at fault --- .../kotlin/fhirengine/azure/FHIRFunctions.kt | 36 ++++++++++--------- .../kotlin/fhirengine/engine/QueueMessage.kt | 24 +++++++++++++ 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index 3c1a17c6d4d..d8f18d81302 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -64,10 +64,11 @@ class FHIRFunctions( ) { val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) messagesToDispatch.forEach { - queueAccess.sendMessage( - elrDestinationFilterQueueName, - it.serialize() - ) + it.send(queueAccess) +// queueAccess.sendMessage( +// elrDestinationFilterQueueName, +// it.serialize() +// ) } } @@ -138,10 +139,11 @@ class FHIRFunctions( val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) messagesToDispatch.forEach { - queueAccess.sendMessage( - elrReceiverFilterQueueName, - it.serialize() - ) + it.send(queueAccess) +// queueAccess.sendMessage( +// elrReceiverFilterQueueName, +// it.serialize() +// ) } } @@ -173,10 +175,11 @@ class FHIRFunctions( ) { val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) messagesToDispatch.forEach { - queueAccess.sendMessage( - elrTranslationQueueName, - it.serialize() - ) + it.send(queueAccess) +// queueAccess.sendMessage( +// elrTranslationQueueName, +// it.serialize() +// ) } } @@ -209,10 +212,11 @@ class FHIRFunctions( val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) // Only dispatches event if Topic.isSendOriginal was true messagesToDispatch.forEach { - queueAccess.sendMessage( - elrSendQueueName, - it.serialize() - ) + it.send(queueAccess) +// queueAccess.sendMessage( +// elrSendQueueName, +// it.serialize() +// ) } } diff --git a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt index 5dce17c74c7..7a93f922c5e 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt @@ -49,7 +49,31 @@ abstract class QueueMessage { fun send(queueAccess: QueueAccess): Unit { when (this.javaClass) { + FhirConvertQueueMessage::class.java -> { + queueAccess.sendMessage( + elrConvertQueueName, + serialize() + ) + } FhirRouteQueueMessage::class.java -> { + queueAccess.sendMessage( + elrRoutingQueueName, + serialize() + ) + } + FhirDestinationFilterQueueMessage::class.java -> { + queueAccess.sendMessage( + elrDestinationFilterQueueName, + serialize() + ) + } + FhirReceiverFilterQueueMessage::class.java -> { + queueAccess.sendMessage( + elrReceiverFilterQueueName, + serialize() + ) + } + FhirTranslateQueueMessage::class.java -> { queueAccess.sendMessage( elrTranslationQueueName, serialize() From 3e0473bd0d88a7ba33e3506b0b6c3573ef302ab4 Mon Sep 17 00:00:00 2001 From: David Holiday Date: Sat, 20 Jul 2024 14:39:35 -0600 Subject: [PATCH 03/10] should be gtg --- .../kotlin/fhirengine/azure/FHIRFunctions.kt | 21 ------------------- .../kotlin/fhirengine/engine/QueueMessage.kt | 7 +++++++ .../fhirengine/azure/FhirFunctionTests.kt | 6 ++++-- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index d8f18d81302..9c27a16cb35 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -24,7 +24,6 @@ import gov.cdc.prime.router.fhirengine.engine.elrConvertQueueName import gov.cdc.prime.router.fhirengine.engine.elrDestinationFilterQueueName import gov.cdc.prime.router.fhirengine.engine.elrReceiverFilterQueueName import gov.cdc.prime.router.fhirengine.engine.elrRoutingQueueName -import gov.cdc.prime.router.fhirengine.engine.elrSendQueueName import gov.cdc.prime.router.fhirengine.engine.elrTranslationQueueName import org.apache.commons.lang3.StringUtils import org.apache.logging.log4j.kotlin.Logging @@ -65,10 +64,6 @@ class FHIRFunctions( val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) messagesToDispatch.forEach { it.send(queueAccess) -// queueAccess.sendMessage( -// elrDestinationFilterQueueName, -// it.serialize() -// ) } } @@ -103,10 +98,6 @@ class FHIRFunctions( val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) messagesToDispatch.forEach { it.send(queueAccess) -// queueAccess.sendMessage( -// elrTranslationQueueName, -// it.serialize() -// ) } } @@ -140,10 +131,6 @@ class FHIRFunctions( messagesToDispatch.forEach { it.send(queueAccess) -// queueAccess.sendMessage( -// elrReceiverFilterQueueName, -// it.serialize() -// ) } } @@ -176,10 +163,6 @@ class FHIRFunctions( val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) messagesToDispatch.forEach { it.send(queueAccess) -// queueAccess.sendMessage( -// elrTranslationQueueName, -// it.serialize() -// ) } } @@ -213,10 +196,6 @@ class FHIRFunctions( // Only dispatches event if Topic.isSendOriginal was true messagesToDispatch.forEach { it.send(queueAccess) -// queueAccess.sendMessage( -// elrSendQueueName, -// it.serialize() -// ) } } diff --git a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt index 7a93f922c5e..13964de2291 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt @@ -79,6 +79,13 @@ abstract class QueueMessage { serialize() ) } + ReportEventQueueMessage::class.java -> { + queueAccess.sendMessage( + elrSendQueueName, + serialize() + ) + } + } } diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt index 5a67e29f6d8..fc944009612 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt @@ -27,6 +27,7 @@ import gov.cdc.prime.router.fhirengine.engine.FHIRRouter import gov.cdc.prime.router.fhirengine.engine.FHIRTranslator import gov.cdc.prime.router.fhirengine.engine.FhirDestinationFilterQueueMessage import gov.cdc.prime.router.fhirengine.engine.FhirRouteQueueMessage +import gov.cdc.prime.router.fhirengine.engine.FhirTranslateQueueMessage import gov.cdc.prime.router.fhirengine.engine.QueueMessage import gov.cdc.prime.router.fhirengine.engine.elrDestinationFilterQueueName import gov.cdc.prime.router.fhirengine.engine.elrTranslationQueueName @@ -244,12 +245,13 @@ class FhirFunctionTests { emptyMap(), emptyList() ) - val message = FhirRouteQueueMessage( + val message = FhirTranslateQueueMessage( report.id, "", "", "ignore.ignore-full-elr", - Topic.FULL_ELR + Topic.FULL_ELR, + "" ) every { fhirEngine.doWork(any(), any(), any()) } returns listOf( FHIREngine.FHIREngineRunResult( From 5fab6d1f24936d65ad467616a3c51471f5eb23b5 Mon Sep 17 00:00:00 2001 From: David Holiday Date: Sat, 20 Jul 2024 14:40:24 -0600 Subject: [PATCH 04/10] asdf --- .../main/kotlin/fhirengine/engine/QueueMessage.kt | 14 ++------------ .../kotlin/fhirengine/azure/FhirFunctionTests.kt | 1 - 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt index 13964de2291..21408fdc54d 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt @@ -1,11 +1,5 @@ package gov.cdc.prime.router.fhirengine.engine -import gov.cdc.prime.router.fhirengine.engine.elrConvertQueueName -import gov.cdc.prime.router.fhirengine.engine.elrDestinationFilterQueueName -import gov.cdc.prime.router.fhirengine.engine.elrReceiverFilterQueueName -import gov.cdc.prime.router.fhirengine.engine.elrRoutingQueueName -import gov.cdc.prime.router.fhirengine.engine.elrSendQueueName -import gov.cdc.prime.router.fhirengine.engine.elrTranslationQueueName import com.fasterxml.jackson.annotation.JsonSubTypes import com.fasterxml.jackson.annotation.JsonTypeInfo import com.fasterxml.jackson.annotation.JsonTypeName @@ -20,7 +14,6 @@ import gov.cdc.prime.router.Topic import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.Event import gov.cdc.prime.router.azure.QueueAccess -import gov.cdc.prime.router.fhirengine.azure.FHIRFunctions import java.util.Base64 import java.util.UUID @@ -44,10 +37,9 @@ private const val MESSAGE_SIZE_LIMIT = 64 * 1000 ) abstract class QueueMessage { - //abstract fun getClass(): Class - - fun send(queueAccess: QueueAccess): Unit { + // abstract fun getClass(): Class + fun send(queueAccess: QueueAccess) { when (this.javaClass) { FhirConvertQueueMessage::class.java -> { queueAccess.sendMessage( @@ -85,9 +77,7 @@ abstract class QueueMessage { serialize() ) } - } - } fun serialize(): String { diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt index fc944009612..b3d425d8469 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt @@ -26,7 +26,6 @@ import gov.cdc.prime.router.fhirengine.engine.FHIREngine import gov.cdc.prime.router.fhirengine.engine.FHIRRouter import gov.cdc.prime.router.fhirengine.engine.FHIRTranslator import gov.cdc.prime.router.fhirengine.engine.FhirDestinationFilterQueueMessage -import gov.cdc.prime.router.fhirengine.engine.FhirRouteQueueMessage import gov.cdc.prime.router.fhirengine.engine.FhirTranslateQueueMessage import gov.cdc.prime.router.fhirengine.engine.QueueMessage import gov.cdc.prime.router.fhirengine.engine.elrDestinationFilterQueueName From 3430b31b6eed8d6455e5a7534d38e325f76a01dd Mon Sep 17 00:00:00 2001 From: David Holiday Date: Sat, 20 Jul 2024 15:35:40 -0600 Subject: [PATCH 05/10] removes dead line of code --- prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt index 21408fdc54d..6871ee4188a 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt @@ -36,9 +36,6 @@ private const val MESSAGE_SIZE_LIMIT = 64 * 1000 JsonSubTypes.Type(ReportEventQueueMessage::class, name = "report") ) abstract class QueueMessage { - - // abstract fun getClass(): Class - fun send(queueAccess: QueueAccess) { when (this.javaClass) { FhirConvertQueueMessage::class.java -> { From 4ae2e37e786c0e4df52db240699a8de1fa8b3c0b Mon Sep 17 00:00:00 2001 From: David Holiday Date: Tue, 30 Jul 2024 15:45:40 -0600 Subject: [PATCH 06/10] refactored queuemessage --- prime-router/src/main/kotlin/azure/Event.kt | 1 + .../kotlin/fhirengine/engine/FHIREngine.kt | 12 ++-- .../kotlin/fhirengine/engine/QueueMessage.kt | 58 +++++++------------ 3 files changed, 28 insertions(+), 43 deletions(-) diff --git a/prime-router/src/main/kotlin/azure/Event.kt b/prime-router/src/main/kotlin/azure/Event.kt index 6d326507358..ebc41baad92 100644 --- a/prime-router/src/main/kotlin/azure/Event.kt +++ b/prime-router/src/main/kotlin/azure/Event.kt @@ -8,6 +8,7 @@ import gov.cdc.prime.router.fhirengine.engine.BatchEventQueueMessage import gov.cdc.prime.router.fhirengine.engine.ProcessEventQueueMessage import gov.cdc.prime.router.fhirengine.engine.QueueMessage import gov.cdc.prime.router.fhirengine.engine.ReportEventQueueMessage +import gov.cdc.prime.router.fhirengine.engine.elrSendQueueName import gov.cdc.prime.router.transport.RetryToken import java.time.OffsetDateTime import java.time.format.DateTimeFormatter diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt index d9eb7eec43f..aa2adbea5a1 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt @@ -23,12 +23,12 @@ import gov.cdc.prime.router.serializers.Hl7Serializer import org.jooq.Field import java.time.OffsetDateTime -const val elrConvertQueueName = "elr-fhir-convert" -const val elrRoutingQueueName = "elr-fhir-route" -const val elrDestinationFilterQueueName = "elr-fhir-destination-filter" -const val elrReceiverFilterQueueName = "elr-fhir-receiver-filter" -const val elrTranslationQueueName = "elr-fhir-translate" -const val elrSendQueueName = "send" +//const val elrConvertQueueName = "elr-fhir-convert" +//const val elrRoutingQueueName = "elr-fhir-route" +//const val elrDestinationFilterQueueName = "elr-fhir-destination-filter" +//const val elrReceiverFilterQueueName = "elr-fhir-receiver-filter" +//const val elrTranslationQueueName = "elr-fhir-translate" +//const val elrSendQueueName = "send" /** * All logical processing for full ELR / FHIR processing should be within this class. diff --git a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt index 6871ee4188a..25b73b84fcd 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt @@ -17,6 +17,15 @@ import gov.cdc.prime.router.azure.QueueAccess import java.util.Base64 import java.util.UUID + +const val elrConvertQueueName = "elr-fhir-convert" +const val elrRoutingQueueName = "elr-fhir-route" +const val elrDestinationFilterQueueName = "elr-fhir-destination-filter" +const val elrReceiverFilterQueueName = "elr-fhir-receiver-filter" +const val elrTranslationQueueName = "elr-fhir-translate" +const val elrSendQueueName = "send" + + // This is a size limit dictated by our infrastructure in azure // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted private const val MESSAGE_SIZE_LIMIT = 64 * 1000 @@ -36,44 +45,11 @@ private const val MESSAGE_SIZE_LIMIT = 64 * 1000 JsonSubTypes.Type(ReportEventQueueMessage::class, name = "report") ) abstract class QueueMessage { + abstract val messageQueueName: String + fun send(queueAccess: QueueAccess) { - when (this.javaClass) { - FhirConvertQueueMessage::class.java -> { - queueAccess.sendMessage( - elrConvertQueueName, - serialize() - ) - } - FhirRouteQueueMessage::class.java -> { - queueAccess.sendMessage( - elrRoutingQueueName, - serialize() - ) - } - FhirDestinationFilterQueueMessage::class.java -> { - queueAccess.sendMessage( - elrDestinationFilterQueueName, - serialize() - ) - } - FhirReceiverFilterQueueMessage::class.java -> { - queueAccess.sendMessage( - elrReceiverFilterQueueName, - serialize() - ) - } - FhirTranslateQueueMessage::class.java -> { - queueAccess.sendMessage( - elrTranslationQueueName, - serialize() - ) - } - ReportEventQueueMessage::class.java -> { - queueAccess.sendMessage( - elrSendQueueName, - serialize() - ) - } + if (this.messageQueueName.isNotEmpty()) { + queueAccess.sendMessage(this.messageQueueName, serialize()) } } @@ -138,6 +114,7 @@ data class FhirConvertQueueMessage( override val blobSubFolderName: String, override val topic: Topic, val schemaName: String = "", + override val messageQueueName: String = elrConvertQueueName ) : ReportPipelineMessage() @JsonTypeName("route") @@ -147,6 +124,7 @@ data class FhirRouteQueueMessage( override val digest: String, override val blobSubFolderName: String, override val topic: Topic, + override val messageQueueName: String = elrRoutingQueueName ) : ReportPipelineMessage() @JsonTypeName("destination-filter") @@ -156,6 +134,7 @@ data class FhirDestinationFilterQueueMessage( override val digest: String, override val blobSubFolderName: String, override val topic: Topic, + override val messageQueueName: String = elrDestinationFilterQueueName ) : ReportPipelineMessage() @JsonTypeName("receiver-filter") @@ -166,6 +145,7 @@ data class FhirReceiverFilterQueueMessage( override val blobSubFolderName: String, override val topic: Topic, val receiverFullName: String, + override val messageQueueName: String = elrReceiverFilterQueueName ) : ReportPipelineMessage() @JsonTypeName("translate") @@ -176,6 +156,7 @@ data class FhirTranslateQueueMessage( override val blobSubFolderName: String, override val topic: Topic, val receiverFullName: String, + override val messageQueueName: String = elrTranslationQueueName ) : ReportPipelineMessage() abstract class WithEventAction : QueueMessage() { @@ -188,6 +169,7 @@ data class BatchEventQueueMessage( val receiverName: String, val emptyBatch: Boolean, val at: String, + override val messageQueueName: String = "" ) : WithEventAction() @JsonTypeName("report") @@ -196,6 +178,7 @@ data class ReportEventQueueMessage( val emptyBatch: Boolean, val reportId: UUID, val at: String, + override val messageQueueName: String = elrSendQueueName ) : WithEventAction() @JsonTypeName("process") @@ -206,4 +189,5 @@ data class ProcessEventQueueMessage( val defaults: Map, val routeTo: List, val at: String, + override val messageQueueName: String = "" ) : WithEventAction() \ No newline at end of file From 80e887db46187ebd41251a1265225886f50bbef4 Mon Sep 17 00:00:00 2001 From: David Holiday Date: Tue, 30 Jul 2024 18:06:05 -0600 Subject: [PATCH 07/10] nearly there --- prime-router/src/main/kotlin/azure/Event.kt | 1 - .../kotlin/fhirengine/azure/FHIRFunctions.kt | 182 ++++++++++-------- .../kotlin/fhirengine/engine/FHIREngine.kt | 6 - .../kotlin/fhirengine/engine/QueueMessage.kt | 18 +- .../azure/FHIRConverterIntegrationTests.kt | 16 +- .../FHIRDestinationFilterIntegrationTests.kt | 10 +- .../FHIRReceiverFilterIntegrationTests.kt | 31 ++- .../azure/FHIRRouterIntegrationTests.kt | 28 ++- .../azure/FhirFunctionIntegrationTests.kt | 27 ++- .../fhirengine/azure/FhirFunctionTests.kt | 9 +- 10 files changed, 193 insertions(+), 135 deletions(-) diff --git a/prime-router/src/main/kotlin/azure/Event.kt b/prime-router/src/main/kotlin/azure/Event.kt index ebc41baad92..6d326507358 100644 --- a/prime-router/src/main/kotlin/azure/Event.kt +++ b/prime-router/src/main/kotlin/azure/Event.kt @@ -8,7 +8,6 @@ import gov.cdc.prime.router.fhirengine.engine.BatchEventQueueMessage import gov.cdc.prime.router.fhirengine.engine.ProcessEventQueueMessage import gov.cdc.prime.router.fhirengine.engine.QueueMessage import gov.cdc.prime.router.fhirengine.engine.ReportEventQueueMessage -import gov.cdc.prime.router.fhirengine.engine.elrSendQueueName import gov.cdc.prime.router.transport.RetryToken import java.time.OffsetDateTime import java.time.format.DateTimeFormatter diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index 9c27a16cb35..ce5b7311cc8 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -35,6 +35,7 @@ class FHIRFunctions( private val queueAccess: QueueAccess = QueueAccess, ) : Logging { + /** * An azure function for ingesting full-ELR HL7 data and converting it to FHIR */ @@ -46,26 +47,27 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - doConvert(message, dequeueCount, FHIRConverter()) + process(message, dequeueCount, FHIRConverter(), ActionHistory(TaskAction.convert)) + //doConvert(message, dequeueCount, FHIRConverter()) } - /** - * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. - * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error - * the [dequeueCount] is tracked as part of the log. - * [actionHistory] is an optional parameter for use in testing - */ - internal fun doConvert( - message: String, - dequeueCount: Int, - fhirEngine: FHIREngine, - actionHistory: ActionHistory = ActionHistory(TaskAction.convert), - ) { - val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) - messagesToDispatch.forEach { - it.send(queueAccess) - } - } +// /** +// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. +// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error +// * the [dequeueCount] is tracked as part of the log. +// * [actionHistory] is an optional parameter for use in testing +// */ +// internal fun doConvert( +// message: String, +// dequeueCount: Int, +// fhirEngine: FHIREngine, +// actionHistory: ActionHistory = ActionHistory(TaskAction.convert), +// ) { +// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) +// messagesToDispatch.forEach { +// it.send(queueAccess) +// } +// } // TODO: remove after route queue empty (see https://github.com/CDCgov/prime-reportstream/issues/15039) /** @@ -79,27 +81,28 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - doRoute(message, dequeueCount, FHIRRouter()) + //doRoute(message, dequeueCount, FHIRRouter()) + process(message, dequeueCount, FHIRRouter(), ActionHistory(TaskAction.route)) } - // TODO: remove after route queue empty (see https://github.com/CDCgov/prime-reportstream/issues/15039) - /** - * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. - * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error - * the [dequeueCount] is tracked as part of the log. - * [actionHistory] is an optional parameter for use in testing - */ - internal fun doRoute( - message: String, - dequeueCount: Int, - fhirEngine: FHIRRouter, - actionHistory: ActionHistory = ActionHistory(TaskAction.route), - ) { - val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) - messagesToDispatch.forEach { - it.send(queueAccess) - } - } +// // TODO: remove after route queue empty (see https://github.com/CDCgov/prime-reportstream/issues/15039) +// /** +// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. +// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error +// * the [dequeueCount] is tracked as part of the log. +// * [actionHistory] is an optional parameter for use in testing +// */ +// internal fun doRoute( +// message: String, +// dequeueCount: Int, +// fhirEngine: FHIRRouter, +// actionHistory: ActionHistory = ActionHistory(TaskAction.route), +// ) { +// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) +// messagesToDispatch.forEach { +// it.send(queueAccess) +// } +// } /** * An azure function for selecting valid destinations for inbound full-ELR FHIR data. @@ -112,27 +115,28 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - doDestinationFilter(message, dequeueCount, FHIRDestinationFilter()) + //doDestinationFilter(message, dequeueCount, FHIRDestinationFilter()) + process(message, dequeueCount, FHIRDestinationFilter(), ActionHistory(TaskAction.destination_filter)) } - /** - * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. - * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error - * the [dequeueCount] is tracked as part of the log. - * [actionHistory] is an optional parameter for use in testing - */ - internal fun doDestinationFilter( - message: String, - dequeueCount: Int, - fhirEngine: FHIRDestinationFilter, - actionHistory: ActionHistory = ActionHistory(TaskAction.destination_filter), - ) { - val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) - - messagesToDispatch.forEach { - it.send(queueAccess) - } - } +// /** +// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. +// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error +// * the [dequeueCount] is tracked as part of the log. +// * [actionHistory] is an optional parameter for use in testing +// */ +// internal fun doDestinationFilter( +// message: String, +// dequeueCount: Int, +// fhirEngine: FHIRDestinationFilter, +// actionHistory: ActionHistory = ActionHistory(TaskAction.destination_filter), +// ) { +// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) +// +// messagesToDispatch.forEach { +// it.send(queueAccess) +// } +// } /** * An azure function for running receiver filters on full-ELR FHIR data @@ -145,26 +149,27 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - doReceiverFilter(message, dequeueCount, FHIRReceiverFilter()) + //doReceiverFilter(message, dequeueCount, FHIRReceiverFilter()) + process(message, dequeueCount, FHIRReceiverFilter(), ActionHistory(TaskAction.receiver_filter)) } - /** - * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. - * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error - * the [dequeueCount] is tracked as part of the log. - * [actionHistory] is an optional parameter for use in testing - */ - internal fun doReceiverFilter( - message: String, - dequeueCount: Int, - fhirEngine: FHIRReceiverFilter, - actionHistory: ActionHistory = ActionHistory(TaskAction.receiver_filter), - ) { - val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) - messagesToDispatch.forEach { - it.send(queueAccess) - } - } +// /** +// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. +// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error +// * the [dequeueCount] is tracked as part of the log. +// * [actionHistory] is an optional parameter for use in testing +// */ +// internal fun doReceiverFilter( +// message: String, +// dequeueCount: Int, +// fhirEngine: FHIRReceiverFilter, +// actionHistory: ActionHistory = ActionHistory(TaskAction.receiver_filter), +// ) { +// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) +// messagesToDispatch.forEach { +// it.send(queueAccess) +// } +// } /** * An azure function for translating full-ELR FHIR data. @@ -177,23 +182,42 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - doTranslate(message, dequeueCount, FHIRTranslator()) + //doTranslate(message, dequeueCount, FHIRTranslator()) + process(message, dequeueCount, FHIRTranslator(), ActionHistory(TaskAction.translate)) } +// /** +// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. +// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error +// * the [dequeueCount] is tracked as part of the log. +// * [actionHistory] is an optional parameter for use in testing +// */ +// fun doTranslate( +// message: String, +// dequeueCount: Int, +// fhirEngine: FHIRTranslator, +// actionHistory: ActionHistory = ActionHistory(TaskAction.translate), +// ) { +// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) +// // Only dispatches event if Topic.isSendOriginal was true +// messagesToDispatch.forEach { +// it.send(queueAccess) +// } +// } + /** * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error * the [dequeueCount] is tracked as part of the log. * [actionHistory] is an optional parameter for use in testing */ - fun doTranslate( + internal fun process( message: String, dequeueCount: Int, - fhirEngine: FHIRTranslator, - actionHistory: ActionHistory = ActionHistory(TaskAction.translate), + fhirEngine: FHIREngine, + actionHistory: ActionHistory, ) { val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) - // Only dispatches event if Topic.isSendOriginal was true messagesToDispatch.forEach { it.send(queueAccess) } diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt index aa2adbea5a1..fbf6ee72e03 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt @@ -23,12 +23,6 @@ import gov.cdc.prime.router.serializers.Hl7Serializer import org.jooq.Field import java.time.OffsetDateTime -//const val elrConvertQueueName = "elr-fhir-convert" -//const val elrRoutingQueueName = "elr-fhir-route" -//const val elrDestinationFilterQueueName = "elr-fhir-destination-filter" -//const val elrReceiverFilterQueueName = "elr-fhir-receiver-filter" -//const val elrTranslationQueueName = "elr-fhir-translate" -//const val elrSendQueueName = "send" /** * All logical processing for full ELR / FHIR processing should be within this class. diff --git a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt index 25b73b84fcd..8a1cc6f72fd 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt @@ -17,7 +17,6 @@ import gov.cdc.prime.router.azure.QueueAccess import java.util.Base64 import java.util.UUID - const val elrConvertQueueName = "elr-fhir-convert" const val elrRoutingQueueName = "elr-fhir-route" const val elrDestinationFilterQueueName = "elr-fhir-destination-filter" @@ -25,7 +24,6 @@ const val elrReceiverFilterQueueName = "elr-fhir-receiver-filter" const val elrTranslationQueueName = "elr-fhir-translate" const val elrSendQueueName = "send" - // This is a size limit dictated by our infrastructure in azure // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted private const val MESSAGE_SIZE_LIMIT = 64 * 1000 @@ -114,7 +112,7 @@ data class FhirConvertQueueMessage( override val blobSubFolderName: String, override val topic: Topic, val schemaName: String = "", - override val messageQueueName: String = elrConvertQueueName + override val messageQueueName: String = elrConvertQueueName, ) : ReportPipelineMessage() @JsonTypeName("route") @@ -124,7 +122,7 @@ data class FhirRouteQueueMessage( override val digest: String, override val blobSubFolderName: String, override val topic: Topic, - override val messageQueueName: String = elrRoutingQueueName + override val messageQueueName: String = elrRoutingQueueName, ) : ReportPipelineMessage() @JsonTypeName("destination-filter") @@ -134,7 +132,7 @@ data class FhirDestinationFilterQueueMessage( override val digest: String, override val blobSubFolderName: String, override val topic: Topic, - override val messageQueueName: String = elrDestinationFilterQueueName + override val messageQueueName: String = elrDestinationFilterQueueName, ) : ReportPipelineMessage() @JsonTypeName("receiver-filter") @@ -145,7 +143,7 @@ data class FhirReceiverFilterQueueMessage( override val blobSubFolderName: String, override val topic: Topic, val receiverFullName: String, - override val messageQueueName: String = elrReceiverFilterQueueName + override val messageQueueName: String = elrReceiverFilterQueueName, ) : ReportPipelineMessage() @JsonTypeName("translate") @@ -156,7 +154,7 @@ data class FhirTranslateQueueMessage( override val blobSubFolderName: String, override val topic: Topic, val receiverFullName: String, - override val messageQueueName: String = elrTranslationQueueName + override val messageQueueName: String = elrTranslationQueueName, ) : ReportPipelineMessage() abstract class WithEventAction : QueueMessage() { @@ -169,7 +167,7 @@ data class BatchEventQueueMessage( val receiverName: String, val emptyBatch: Boolean, val at: String, - override val messageQueueName: String = "" + override val messageQueueName: String = "", ) : WithEventAction() @JsonTypeName("report") @@ -178,7 +176,7 @@ data class ReportEventQueueMessage( val emptyBatch: Boolean, val reportId: UUID, val at: String, - override val messageQueueName: String = elrSendQueueName + override val messageQueueName: String = elrSendQueueName, ) : WithEventAction() @JsonTypeName("process") @@ -189,5 +187,5 @@ data class ProcessEventQueueMessage( val defaults: Map, val routeTo: List, val at: String, - override val messageQueueName: String = "" + override val messageQueueName: String = "", ) : WithEventAction() \ No newline at end of file diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt index cb995649dc9..d6a1b8c7c32 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt @@ -15,6 +15,7 @@ import gov.cdc.prime.router.Options import gov.cdc.prime.router.Report import gov.cdc.prime.router.Sender import gov.cdc.prime.router.Topic +import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DatabaseLookupTableAccess import gov.cdc.prime.router.azure.Event @@ -241,7 +242,8 @@ class FHIRConverterIntegrationTests { val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7SenderWithNoTransform) val fhirFunctions = createFHIRFunctionsInstance() - fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val (routedReports, _) = fetchChildReports( @@ -406,7 +408,8 @@ class FHIRConverterIntegrationTests { ) val fhirFunctions = createFHIRFunctionsInstance() - fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val (routedReports, _) = fetchChildReports( @@ -530,7 +533,8 @@ class FHIRConverterIntegrationTests { val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, senderWithValidation) val fhirFunctions = createFHIRFunctionsInstance() - fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val (routedReports, notRouted) = fetchChildReports( @@ -638,7 +642,8 @@ class FHIRConverterIntegrationTests { val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() - fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val routedReports = fetchChildReports(receiveReport, txn, 2) @@ -710,7 +715,8 @@ class FHIRConverterIntegrationTests { val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() - fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) verify(exactly = 0) { QueueAccess.sendMessage(any(), any()) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt index 2863f53857a..c86aa02fbf7 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt @@ -15,6 +15,7 @@ import gov.cdc.prime.router.Report import gov.cdc.prime.router.ReportStreamFilter import gov.cdc.prime.router.Sender import gov.cdc.prime.router.Topic +import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DatabaseLookupTableAccess import gov.cdc.prime.router.azure.Event @@ -185,7 +186,8 @@ class FHIRDestinationFilterIntegrationTests : Logging { val destinationFilter = createDestinationFilter(azureEventsService, org) // execute - fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) + // fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) + fhirFunctions.process(queueMessage, 1, destinationFilter, ActionHistory(TaskAction.destination_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -274,7 +276,8 @@ class FHIRDestinationFilterIntegrationTests : Logging { val destinationFilter = createDestinationFilter(azureEventsService, org) // execute - fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) + //fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) + fhirFunctions.process(queueMessage, 1, destinationFilter, ActionHistory(TaskAction.destination_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -367,7 +370,8 @@ class FHIRDestinationFilterIntegrationTests : Logging { val destinationFilter = createDestinationFilter(azureEventsService, org) // execute - fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) + // fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) + fhirFunctions.process(queueMessage, 1, destinationFilter, ActionHistory(TaskAction.destination_filter)) // no messages should have been routed due to filter verify(exactly = 0) { diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt index d798ac838f9..511f7fd171c 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt @@ -20,6 +20,7 @@ import gov.cdc.prime.router.ReportStreamFilter import gov.cdc.prime.router.ReportStreamFilterType import gov.cdc.prime.router.Sender import gov.cdc.prime.router.Topic +import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DatabaseLookupTableAccess import gov.cdc.prime.router.azure.Event @@ -263,7 +264,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -344,7 +346,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -441,7 +444,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -512,7 +516,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -614,7 +619,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -694,7 +700,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -789,7 +796,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check queue message verify(exactly = 0) { @@ -905,7 +913,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -975,7 +984,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -1041,7 +1051,8 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) + fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check queue verify(exactly = 0) { diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRRouterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRRouterIntegrationTests.kt index 8edf1e5af70..40bede469eb 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRRouterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRRouterIntegrationTests.kt @@ -20,6 +20,7 @@ import gov.cdc.prime.router.ReportStreamFilterResult import gov.cdc.prime.router.ReportStreamFilterType import gov.cdc.prime.router.Sender import gov.cdc.prime.router.Topic +import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DatabaseLookupTableAccess import gov.cdc.prime.router.azure.Event @@ -490,7 +491,8 @@ class FHIRRouterIntegrationTests : Logging { val org = createOrganizationWithReceivers(receiverList) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -589,7 +591,8 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -684,7 +687,8 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(listOf(ReceiverSetupData("x", jurisdictionalFilter = jurisdictionalFilterCo))) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -751,7 +755,8 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // no messages should have been routed due to filter verify(exactly = 0) { @@ -803,7 +808,8 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // no messages should have been routed due to filter verify(exactly = 0) { @@ -870,7 +876,8 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -945,7 +952,8 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // no messages should have been routed due to filter verify(exactly = 0) { @@ -1007,7 +1015,8 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -1083,7 +1092,8 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) + fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results verify(exactly = 0) { diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt index 4124435c3cd..88d805c9736 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt @@ -399,7 +399,8 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) assertThrows { - fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) } val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) @@ -479,7 +480,8 @@ class FhirFunctionIntegrationTests { workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) assertThat(processTask.processedAt).isNotNull() @@ -560,7 +562,8 @@ class FhirFunctionIntegrationTests { workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) assertThat(processTask.processedAt).isNotNull() @@ -671,7 +674,8 @@ class FhirFunctionIntegrationTests { actionLogger = actionLogger, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) assertThat(processTask.processedAt).isNotNull() @@ -786,7 +790,8 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - fhirFunc.doRoute(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doRoute(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val convertTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) assertThat(convertTask.routedAt).isNotNull() @@ -942,7 +947,8 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // verify task and report_file tables were updated correctly in the Translate function (new task and new // record file created) @@ -1112,7 +1118,8 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // verify task and report_file tables were updated correctly in the Translate function ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -1212,7 +1219,8 @@ class FhirFunctionIntegrationTests { workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) assertThat(processTask.processedAt).isNotNull() @@ -1294,7 +1302,8 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) assertThat(processTask.processedAt).isNotNull() diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt index b3d425d8469..7107f7efca4 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt @@ -182,7 +182,8 @@ class FhirFunctionTests { "\"ignore.ignore-full-elr\",\"schemaName\":\"someSchema\",\"topic\":\"full-elr\"}" // act - fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // assert verify(exactly = 1) { @@ -269,7 +270,8 @@ class FhirFunctionTests { "\"blobSubFolderName\":\"ignore.ignore-full-elr\",\"topic\":\"full-elr\"}" // act - fhirFunc.doRoute(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doRoute(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // assert verify(exactly = 1) { @@ -347,7 +349,8 @@ class FhirFunctionTests { "\"blobSubFolderName\":\"ignore.ignore-full-elr\",\"topic\":\"full-elr\",\"receiverFullName\":\"elr.phd\"}" // act - fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) + //fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) + fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // assert verify(exactly = 1) { From 2388e1a4eadfc75303269c4b410cfffd59269db6 Mon Sep 17 00:00:00 2001 From: David Holiday Date: Tue, 30 Jul 2024 18:24:45 -0600 Subject: [PATCH 08/10] done and done --- .../kotlin/fhirengine/azure/FHIRFunctions.kt | 101 +----------------- .../azure/FHIRConverterIntegrationTests.kt | 5 - .../FHIRDestinationFilterIntegrationTests.kt | 3 - .../FHIRReceiverFilterIntegrationTests.kt | 10 -- .../azure/FHIRRouterIntegrationTests.kt | 9 -- .../azure/FhirFunctionIntegrationTests.kt | 6 -- .../fhirengine/azure/FhirFunctionTests.kt | 3 - 7 files changed, 1 insertion(+), 136 deletions(-) diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index ce5b7311cc8..eb178fd1f77 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -34,8 +34,7 @@ class FHIRFunctions( private val databaseAccess: DatabaseAccess = BaseEngine.databaseAccessSingleton, private val queueAccess: QueueAccess = QueueAccess, ) : Logging { - - + /** * An azure function for ingesting full-ELR HL7 data and converting it to FHIR */ @@ -48,27 +47,8 @@ class FHIRFunctions( @BindingName("DequeueCount") dequeueCount: Int = 1, ) { process(message, dequeueCount, FHIRConverter(), ActionHistory(TaskAction.convert)) - //doConvert(message, dequeueCount, FHIRConverter()) } -// /** -// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. -// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error -// * the [dequeueCount] is tracked as part of the log. -// * [actionHistory] is an optional parameter for use in testing -// */ -// internal fun doConvert( -// message: String, -// dequeueCount: Int, -// fhirEngine: FHIREngine, -// actionHistory: ActionHistory = ActionHistory(TaskAction.convert), -// ) { -// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) -// messagesToDispatch.forEach { -// it.send(queueAccess) -// } -// } - // TODO: remove after route queue empty (see https://github.com/CDCgov/prime-reportstream/issues/15039) /** * An azure function for routing full-ELR FHIR data. @@ -81,29 +61,9 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - //doRoute(message, dequeueCount, FHIRRouter()) process(message, dequeueCount, FHIRRouter(), ActionHistory(TaskAction.route)) } -// // TODO: remove after route queue empty (see https://github.com/CDCgov/prime-reportstream/issues/15039) -// /** -// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. -// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error -// * the [dequeueCount] is tracked as part of the log. -// * [actionHistory] is an optional parameter for use in testing -// */ -// internal fun doRoute( -// message: String, -// dequeueCount: Int, -// fhirEngine: FHIRRouter, -// actionHistory: ActionHistory = ActionHistory(TaskAction.route), -// ) { -// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) -// messagesToDispatch.forEach { -// it.send(queueAccess) -// } -// } - /** * An azure function for selecting valid destinations for inbound full-ELR FHIR data. */ @@ -115,29 +75,9 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - //doDestinationFilter(message, dequeueCount, FHIRDestinationFilter()) process(message, dequeueCount, FHIRDestinationFilter(), ActionHistory(TaskAction.destination_filter)) } -// /** -// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. -// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error -// * the [dequeueCount] is tracked as part of the log. -// * [actionHistory] is an optional parameter for use in testing -// */ -// internal fun doDestinationFilter( -// message: String, -// dequeueCount: Int, -// fhirEngine: FHIRDestinationFilter, -// actionHistory: ActionHistory = ActionHistory(TaskAction.destination_filter), -// ) { -// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) -// -// messagesToDispatch.forEach { -// it.send(queueAccess) -// } -// } - /** * An azure function for running receiver filters on full-ELR FHIR data */ @@ -149,28 +89,9 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - //doReceiverFilter(message, dequeueCount, FHIRReceiverFilter()) process(message, dequeueCount, FHIRReceiverFilter(), ActionHistory(TaskAction.receiver_filter)) } -// /** -// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. -// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error -// * the [dequeueCount] is tracked as part of the log. -// * [actionHistory] is an optional parameter for use in testing -// */ -// internal fun doReceiverFilter( -// message: String, -// dequeueCount: Int, -// fhirEngine: FHIRReceiverFilter, -// actionHistory: ActionHistory = ActionHistory(TaskAction.receiver_filter), -// ) { -// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) -// messagesToDispatch.forEach { -// it.send(queueAccess) -// } -// } - /** * An azure function for translating full-ELR FHIR data. */ @@ -182,29 +103,9 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - //doTranslate(message, dequeueCount, FHIRTranslator()) process(message, dequeueCount, FHIRTranslator(), ActionHistory(TaskAction.translate)) } -// /** -// * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. -// * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error -// * the [dequeueCount] is tracked as part of the log. -// * [actionHistory] is an optional parameter for use in testing -// */ -// fun doTranslate( -// message: String, -// dequeueCount: Int, -// fhirEngine: FHIRTranslator, -// actionHistory: ActionHistory = ActionHistory(TaskAction.translate), -// ) { -// val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory) -// // Only dispatches event if Topic.isSendOriginal was true -// messagesToDispatch.forEach { -// it.send(queueAccess) -// } -// } - /** * Functionality separated from azure function call so a mocked fhirEngine can be passed in for testing. * Reads the [message] passed in and processes it using the appropriate [fhirEngine]. If there is an error diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt index d6a1b8c7c32..1a10f947e62 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt @@ -242,7 +242,6 @@ class FHIRConverterIntegrationTests { val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7SenderWithNoTransform) val fhirFunctions = createFHIRFunctionsInstance() - //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -408,7 +407,6 @@ class FHIRConverterIntegrationTests { ) val fhirFunctions = createFHIRFunctionsInstance() - //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -533,7 +531,6 @@ class FHIRConverterIntegrationTests { val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, senderWithValidation) val fhirFunctions = createFHIRFunctionsInstance() - //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -642,7 +639,6 @@ class FHIRConverterIntegrationTests { val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() - //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> @@ -715,7 +711,6 @@ class FHIRConverterIntegrationTests { val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() - //fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) verify(exactly = 0) { diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt index c86aa02fbf7..9fcba706a2d 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt @@ -186,7 +186,6 @@ class FHIRDestinationFilterIntegrationTests : Logging { val destinationFilter = createDestinationFilter(azureEventsService, org) // execute - // fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) fhirFunctions.process(queueMessage, 1, destinationFilter, ActionHistory(TaskAction.destination_filter)) // check results @@ -276,7 +275,6 @@ class FHIRDestinationFilterIntegrationTests : Logging { val destinationFilter = createDestinationFilter(azureEventsService, org) // execute - //fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) fhirFunctions.process(queueMessage, 1, destinationFilter, ActionHistory(TaskAction.destination_filter)) // check results @@ -370,7 +368,6 @@ class FHIRDestinationFilterIntegrationTests : Logging { val destinationFilter = createDestinationFilter(azureEventsService, org) // execute - // fhirFunctions.doDestinationFilter(queueMessage, 1, destinationFilter) fhirFunctions.process(queueMessage, 1, destinationFilter, ActionHistory(TaskAction.destination_filter)) // no messages should have been routed due to filter diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt index 511f7fd171c..50a0a0d6b63 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt @@ -264,7 +264,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results @@ -346,7 +345,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results @@ -444,7 +442,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results @@ -516,7 +513,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results @@ -619,7 +615,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results @@ -700,7 +695,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results @@ -796,7 +790,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check queue message @@ -913,7 +906,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results @@ -984,7 +976,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check results @@ -1051,7 +1042,6 @@ class FHIRReceiverFilterIntegrationTests : Logging { val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() // execute - //fhirFunctions.doReceiverFilter(queueMessage, 1, receiverFilter) fhirFunctions.process(queueMessage, 1, receiverFilter, ActionHistory(TaskAction.receiver_filter)) // check queue diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRRouterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRRouterIntegrationTests.kt index 40bede469eb..5a0319af4dd 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRRouterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRRouterIntegrationTests.kt @@ -491,7 +491,6 @@ class FHIRRouterIntegrationTests : Logging { val org = createOrganizationWithReceivers(receiverList) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results @@ -591,7 +590,6 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results @@ -687,7 +685,6 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(listOf(ReceiverSetupData("x", jurisdictionalFilter = jurisdictionalFilterCo))) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results @@ -755,7 +752,6 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // no messages should have been routed due to filter @@ -808,7 +804,6 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // no messages should have been routed due to filter @@ -876,7 +871,6 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results @@ -952,7 +946,6 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // no messages should have been routed due to filter @@ -1015,7 +1008,6 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results @@ -1092,7 +1084,6 @@ class FHIRRouterIntegrationTests : Logging { val receivers = createReceivers(receiverSetupData) val org = createOrganizationWithReceivers(receivers) val fhirRouter = createFHIRRouter(org) - //fhirFunctions.doRoute(queueMessage, 1, fhirRouter) fhirFunctions.process(queueMessage, 1, fhirRouter, ActionHistory(TaskAction.route)) // check results diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt index 88d805c9736..c984ea59738 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt @@ -399,7 +399,6 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) assertThrows { - //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) } @@ -480,7 +479,6 @@ class FhirFunctionIntegrationTests { workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) @@ -562,7 +560,6 @@ class FhirFunctionIntegrationTests { workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) @@ -674,7 +671,6 @@ class FhirFunctionIntegrationTests { actionLogger = actionLogger, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) @@ -1219,7 +1215,6 @@ class FhirFunctionIntegrationTests { workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) @@ -1302,7 +1297,6 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val processTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt index 7107f7efca4..6957b3bb873 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionTests.kt @@ -182,7 +182,6 @@ class FhirFunctionTests { "\"ignore.ignore-full-elr\",\"schemaName\":\"someSchema\",\"topic\":\"full-elr\"}" // act - //fhirFunc.doConvert(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // assert @@ -270,7 +269,6 @@ class FhirFunctionTests { "\"blobSubFolderName\":\"ignore.ignore-full-elr\",\"topic\":\"full-elr\"}" // act - //fhirFunc.doRoute(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // assert @@ -349,7 +347,6 @@ class FhirFunctionTests { "\"blobSubFolderName\":\"ignore.ignore-full-elr\",\"topic\":\"full-elr\",\"receiverFullName\":\"elr.phd\"}" // act - //fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // assert From e48afc19c7276cc0982f61cd3c7f7fab740911a7 Mon Sep 17 00:00:00 2001 From: David Holiday Date: Wed, 31 Jul 2024 10:15:37 -0600 Subject: [PATCH 09/10] done and done --- .../src/main/kotlin/fhirengine/azure/FHIRFunctions.kt | 2 +- .../src/main/kotlin/fhirengine/engine/FHIREngine.kt | 1 - .../kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt | 6 +++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index eb178fd1f77..d1dccbe7e71 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -34,7 +34,7 @@ class FHIRFunctions( private val databaseAccess: DatabaseAccess = BaseEngine.databaseAccessSingleton, private val queueAccess: QueueAccess = QueueAccess, ) : Logging { - + /** * An azure function for ingesting full-ELR HL7 data and converting it to FHIR */ diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt index fbf6ee72e03..daea443c748 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt @@ -23,7 +23,6 @@ import gov.cdc.prime.router.serializers.Hl7Serializer import org.jooq.Field import java.time.OffsetDateTime - /** * All logical processing for full ELR / FHIR processing should be within this class. * [metadata] mockable metadata diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt index c984ea59738..c101fe4f14f 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt @@ -786,7 +786,7 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - //fhirFunc.doRoute(queueMessage, 1, fhirEngine, actionHistory) + // fhirFunc.doRoute(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val convertTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id) @@ -943,7 +943,7 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - //fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) + // fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // verify task and report_file tables were updated correctly in the Translate function (new task and new @@ -1114,7 +1114,7 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - //fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) + // fhirFunc.doTranslate(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) // verify task and report_file tables were updated correctly in the Translate function From 6fb126ca065d19f125b6f5f0e45e6a9c264d634f Mon Sep 17 00:00:00 2001 From: David Holiday Date: Tue, 6 Aug 2024 11:58:33 -0600 Subject: [PATCH 10/10] pr comment --- .../test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt index c101fe4f14f..3e3fab4f180 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FhirFunctionIntegrationTests.kt @@ -786,7 +786,6 @@ class FhirFunctionIntegrationTests { databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess ) - // fhirFunc.doRoute(queueMessage, 1, fhirEngine, actionHistory) fhirFunc.process(queueMessage, 1, fhirEngine, actionHistory) val convertTask = ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchTask(report.id)