Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform/13199/fhir function dynamic message routing #15273

26 changes: 5 additions & 21 deletions prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import gov.cdc.prime.router.fhirengine.engine.elrConvertQueueName
import gov.cdc.prime.router.fhirengine.engine.elrDestinationFilterQueueName
import gov.cdc.prime.router.fhirengine.engine.elrReceiverFilterQueueName
import gov.cdc.prime.router.fhirengine.engine.elrRoutingQueueName
import gov.cdc.prime.router.fhirengine.engine.elrSendQueueName
import gov.cdc.prime.router.fhirengine.engine.elrTranslationQueueName
import org.apache.commons.lang3.StringUtils
import org.apache.logging.log4j.kotlin.Logging
Expand Down Expand Up @@ -64,10 +63,7 @@ class FHIRFunctions(
) {
val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory)
messagesToDispatch.forEach {
queueAccess.sendMessage(
elrDestinationFilterQueueName,
it.serialize()
)
it.send(queueAccess)
}
}

Expand Down Expand Up @@ -101,10 +97,7 @@ class FHIRFunctions(
) {
val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory)
messagesToDispatch.forEach {
queueAccess.sendMessage(
elrTranslationQueueName,
it.serialize()
)
it.send(queueAccess)
}
}

Expand Down Expand Up @@ -137,10 +130,7 @@ class FHIRFunctions(
val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory)

messagesToDispatch.forEach {
queueAccess.sendMessage(
elrReceiverFilterQueueName,
it.serialize()
)
it.send(queueAccess)
}
}

Expand Down Expand Up @@ -172,10 +162,7 @@ class FHIRFunctions(
) {
val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory)
messagesToDispatch.forEach {
queueAccess.sendMessage(
elrTranslationQueueName,
it.serialize()
)
it.send(queueAccess)
}
}

Expand Down Expand Up @@ -208,10 +195,7 @@ class FHIRFunctions(
val messagesToDispatch = runFhirEngine(message, dequeueCount, fhirEngine, actionHistory)
// Only dispatches event if Topic.isSendOriginal was true
messagesToDispatch.forEach {
queueAccess.sendMessage(
elrSendQueueName,
it.serialize()
)
it.send(queueAccess)
}
}

Expand Down
42 changes: 42 additions & 0 deletions prime-router/src/main/kotlin/fhirengine/engine/QueueMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import gov.cdc.prime.router.ReportId
import gov.cdc.prime.router.Topic
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.Event
import gov.cdc.prime.router.azure.QueueAccess
import java.util.Base64
import java.util.UUID

Expand All @@ -35,6 +36,47 @@ private const val MESSAGE_SIZE_LIMIT = 64 * 1000
JsonSubTypes.Type(ReportEventQueueMessage::class, name = "report")
)
abstract class QueueMessage {
fun send(queueAccess: QueueAccess) {
when (this.javaClass) {
FhirConvertQueueMessage::class.java -> {
queueAccess.sendMessage(
elrConvertQueueName,
serialize()
)
}
FhirRouteQueueMessage::class.java -> {
queueAccess.sendMessage(
elrRoutingQueueName,
serialize()
)
}
FhirDestinationFilterQueueMessage::class.java -> {
queueAccess.sendMessage(
elrDestinationFilterQueueName,
serialize()
)
}
FhirReceiverFilterQueueMessage::class.java -> {
queueAccess.sendMessage(
elrReceiverFilterQueueName,
serialize()
)
}
FhirTranslateQueueMessage::class.java -> {
queueAccess.sendMessage(
elrTranslationQueueName,
serialize()
)
}
ReportEventQueueMessage::class.java -> {
queueAccess.sendMessage(
elrSendQueueName,
serialize()
)
}
}
}

fun serialize(): String {
val bytes = mapper.writeValueAsBytes(this)
check(bytes.size < MESSAGE_SIZE_LIMIT) { "Message is too big for the queue." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import gov.cdc.prime.router.fhirengine.engine.FHIREngine
import gov.cdc.prime.router.fhirengine.engine.FHIRRouter
import gov.cdc.prime.router.fhirengine.engine.FHIRTranslator
import gov.cdc.prime.router.fhirengine.engine.FhirDestinationFilterQueueMessage
import gov.cdc.prime.router.fhirengine.engine.FhirRouteQueueMessage
import gov.cdc.prime.router.fhirengine.engine.FhirTranslateQueueMessage
import gov.cdc.prime.router.fhirengine.engine.QueueMessage
import gov.cdc.prime.router.fhirengine.engine.elrDestinationFilterQueueName
import gov.cdc.prime.router.fhirengine.engine.elrTranslationQueueName
Expand Down Expand Up @@ -244,12 +244,13 @@ class FhirFunctionTests {
emptyMap(),
emptyList()
)
val message = FhirRouteQueueMessage(
val message = FhirTranslateQueueMessage(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the story here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test logic wasn't testing what it was supposed to be testing

report.id,
"",
"",
"ignore.ignore-full-elr",
Topic.FULL_ELR
Topic.FULL_ELR,
""
)
every { fhirEngine.doWork(any(), any(), any()) } returns listOf(
FHIREngine.FHIREngineRunResult(
Expand Down