diff --git a/processing-status-api-function-app/src/main/java/gov/cdc/ocio/processingstatusapi/FunctionJavaWrappers.java b/processing-status-api-function-app/src/main/java/gov/cdc/ocio/processingstatusapi/FunctionJavaWrappers.java index c4512df8..1803e511 100644 --- a/processing-status-api-function-app/src/main/java/gov/cdc/ocio/processingstatusapi/FunctionJavaWrappers.java +++ b/processing-status-api-function-app/src/main/java/gov/cdc/ocio/processingstatusapi/FunctionJavaWrappers.java @@ -257,16 +257,15 @@ public HttpResponseMessage getHL7DirectIndirectMessageCounts( return new GetReportCountsFunction(request).getHL7DirectIndirectMessageCounts(); } - @FunctionName("GetUnfinishedUploadCounts") + @FunctionName("GetBadMetadataAndUnfinishedUploadCounts") public HttpResponseMessage GetUnfinishedUploadCounts( @HttpTrigger( name = "req", methods = {HttpMethod.GET}, - route = "report/counts/totalUnfinishedUploads/{dataStreamId}", + route = "report/counts/badMetadataAndUnfinishedUploadCounts", authLevel = AuthorizationLevel.ANONYMOUS - ) HttpRequestMessage> request, - @BindingName("dataStreamId") String dataStreamId) { - return new GetReportCountsFunction(request).getUnfinishedUploadCounts(dataStreamId); + ) HttpRequestMessage> request) { + return new GetReportCountsFunction(request).getBadMetadataAndUnfinishedUploadCounts(); } @FunctionName("GetInvalidMessageCounts") @@ -280,6 +279,17 @@ public HttpResponseMessage GetInvalidMessageCounts( return new GetReportCountsFunction(request).getInvalidMessageCounts(); } + @FunctionName("GetRollupCounts") + public HttpResponseMessage getRollupCounts( + @HttpTrigger( + name = "req", + methods = {HttpMethod.GET}, + route = "report/counts/submissions/rollup", + authLevel = AuthorizationLevel.ANONYMOUS + ) HttpRequestMessage> request) { + return new GetReportCountsFunction(request).getRollupCounts(); + } + @FunctionName("GetStatusByUploadId") public HttpResponseMessage getStatusByUploadId( @HttpTrigger( diff --git a/processing-status-api-function-app/src/main/kotlin/gov/cdc/ocio/processingstatusapi/functions/status/GetReportCountsFunction.kt b/processing-status-api-function-app/src/main/kotlin/gov/cdc/ocio/processingstatusapi/functions/status/GetReportCountsFunction.kt index ac46d361..1c0354fa 100644 --- a/processing-status-api-function-app/src/main/kotlin/gov/cdc/ocio/processingstatusapi/functions/status/GetReportCountsFunction.kt +++ b/processing-status-api-function-app/src/main/kotlin/gov/cdc/ocio/processingstatusapi/functions/status/GetReportCountsFunction.kt @@ -523,7 +523,7 @@ class GetReportCountsFunction( val numCompletedUploadingSqlQuery = ( "select " + "value count(1) " - + "from Reports r " + + "from $reportsContainerName r " + "where r.content.schema_name = 'upload' and r.content['offset'] = r.content.size and " + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause" ) @@ -537,7 +537,7 @@ class GetReportCountsFunction( val numUploadingSqlQuery = ( "select " + "value count(1) " - + "from Reports r " + + "from $reportsContainerName r " + "where r.content.schema_name = 'upload' and r.content['offset'] != r.content.size and " + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause" ) @@ -551,7 +551,7 @@ class GetReportCountsFunction( val numFailedSqlQuery = ( "select " + "value count(1) " - + "from Reports r " + + "from $reportsContainerName r " + "where r.content.schema_name = 'dex-metadata-verify' and r.content.issues != null and " + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause" ) @@ -580,28 +580,30 @@ class GetReportCountsFunction( } /** - *Get direct and in-direct message counts + * Get direct and in-direct message counts * - * * @return HttpResponseMessage + * @return HttpResponseMessage */ fun getHL7DirectIndirectMessageCounts(): HttpResponseMessage { val queryParams = prepareQueryParameters(request) - //Verify the request is complete + + // Verify the request is complete checkRequiredCountsQueryParams( - queryParams?.get("dataStreamId"), - queryParams?.get("dataStreamRoute"), - queryParams?.get("dateStart"), - queryParams?.get("dateEnd"), - queryParams?.get("daysInterval"), + queryParams["dataStreamId"], + queryParams["dataStreamRoute"], + queryParams["dateStart"], + queryParams["dateEnd"], + queryParams["daysInterval"], true )?.let { return it } - val timeRangeWhereClause: String try { - timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"), - queryParams?.get("dateStart"), queryParams?.get("dateEnd") + timeRangeWhereClause = buildSqlClauseForDateRange( + queryParams["daysInterval"], + queryParams["dateStart"], + queryParams["dateEnd"] ) } catch (e: Exception) { logger.error(e.localizedMessage) @@ -613,14 +615,14 @@ class GetReportCountsFunction( val directMessageQuery = ( "select value SUM(directCounts) " - + " FROM (select value SUM(r.content.stage.report.number_of_messages) from Reports r " - + " where r.content.schema_name = '${HL7Debatch.schemaDefinition.schemaName}' and " - + " r.dataStreamId = '${queryParams?.get("dataStreamId")}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts" + + "from (select value SUM(r.content.stage.report.number_of_messages) from $reportsContainerName r " + + "where r.content.schema_name = '${HL7Debatch.schemaDefinition.schemaName}' and " + + "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts" ) val indirectMessageQuery = ( "select value count(redactedCount) from ( " - + "select * from Reports r where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and " + + "select * from $reportsContainerName r where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and " + "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as redactedCount" ) @@ -652,25 +654,30 @@ class GetReportCountsFunction( .build() } - - fun getUnfinishedUploadCounts (dataStreamId: String): HttpResponseMessage { + /** + * Get the number of uploads stopped due to metadata issues and also unfinished upload counts. + * + * @return HttpResponseMessage + */ + fun getBadMetadataAndUnfinishedUploadCounts(): HttpResponseMessage { val queryParams = prepareQueryParameters(request) - //Verify the request is complete + // Verify the request is complete checkRequiredCountsQueryParams( - dataStreamId, - queryParams?.get("dataStreamRoute"), - queryParams?.get("dateStart"), - queryParams?.get("dateEnd"), - queryParams?.get("daysInterval"), + queryParams["dataStreamId"], + queryParams["dataStreamRoute"], + queryParams["dateStart"], + queryParams["dateEnd"], + queryParams["daysInterval"], true )?.let { return it } - val timeRangeWhereClause: String try { - timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"), - queryParams?.get("dateStart"), queryParams?.get("dateEnd") + timeRangeWhereClause = buildSqlClauseForDateRange( + queryParams["daysInterval"], + queryParams["dateStart"], + queryParams["dateEnd"] ) } catch (e: Exception) { logger.error(e.localizedMessage) @@ -680,25 +687,42 @@ class GetReportCountsFunction( .build() } - val unfinishedCountsQuery = ( + val startTime = System.currentTimeMillis() + + val badMetadataCountsQuery = ( "select value count(1) " - + " FROM Reports r where r.content.schema_name = 'dex-metadata-verify' and r.content.schema_name = 'upload' " - + " and r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause" + + "from $reportsContainerName r " + + "where r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and " + + "r.content.schema_name = 'dex-metadata-verify' and " + + "ARRAY_LENGTH(r.content.issues) > 0 and $timeRangeWhereClause" ) - val startTime = System.currentTimeMillis() + val unfinishedUploadsCountsQuery = ( + "select value count(1) " + + "from $reportsContainerName r " + + "where r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and " + + "r.content.schema_name = 'upload' and " + + "r.content['offset'] < r.content['size'] and $timeRangeWhereClause" + ) - val directCountResult = reportsContainer.queryItems( - unfinishedCountsQuery, CosmosQueryRequestOptions(), + val badMetadataCountsResult = reportsContainer.queryItems( + badMetadataCountsQuery, CosmosQueryRequestOptions(), Float::class.java ) + val badMetadataCount = badMetadataCountsResult.firstOrNull() ?: 0 - val unfinishedTotalItems = directCountResult.firstOrNull() ?: 0 + val unfinishedUploadsCountsResult = reportsContainer.queryItems( + unfinishedUploadsCountsQuery, CosmosQueryRequestOptions(), + Float::class.java + ) + + val unfinishedUploadsCount = unfinishedUploadsCountsResult.firstOrNull() ?: 0 val endTime = System.currentTimeMillis() val countsJson = JSONObject() - .put("unfinished_upload_counts", unfinishedTotalItems) + .put("bad_metadata_count", badMetadataCount) + .put("unfinished_upload_count", unfinishedUploadsCount) .put("query_time_millis", endTime - startTime) return request @@ -708,24 +732,32 @@ class GetReportCountsFunction( .build() } + /** + * Get the number of invalid messages count using two different methods. + * + * @return HttpResponseMessage + */ fun getInvalidMessageCounts() : HttpResponseMessage { val queryParams = prepareQueryParameters(request) + //Verify the request is complete checkRequiredCountsQueryParams( - queryParams?.get("dataStreamId"), - queryParams?.get("dataStreamRoute"), - queryParams?.get("dateStart"), - queryParams?.get("dateEnd"), - queryParams?.get("daysInterval"), + queryParams["dataStreamId"], + queryParams["dataStreamRoute"], + queryParams["dateStart"], + queryParams["dateEnd"], + queryParams["daysInterval"], true )?.let { return it } val timeRangeWhereClause: String try { - timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"), - queryParams?.get("dateStart"), queryParams?.get("dateEnd") + timeRangeWhereClause = buildSqlClauseForDateRange( + queryParams["daysInterval"], + queryParams["dateStart"], + queryParams["dateEnd"] ) } catch (e: Exception) { logger.error(e.localizedMessage) @@ -737,7 +769,7 @@ class GetReportCountsFunction( val directInvalidMessageQuery = ( "select value SUM(directCounts) " - + " FROM (select value SUM(r.content.stage.report.number_of_messages) from Reports r " + + " FROM (select value SUM(r.content.stage.report.number_of_messages) from $reportsContainerName r " + " where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and " + " r.dataStreamId = '${queryParams?.get("dataStreamId")}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts" ) @@ -752,7 +784,7 @@ class GetReportCountsFunction( val indirectInvalidMessageQuery = ( "select value count(invalidCounts) from ( " - + "select * from Reports r where r.content.schema_name != 'HL7-JSON-LAKE-TRANSFORMER' and " + + "select * from $reportsContainerName r where r.content.schema_name != 'HL7-JSON-LAKE-TRANSFORMER' and " + "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as invalidCounts" ) @@ -773,14 +805,15 @@ class GetReportCountsFunction( Float::class.java ) - val directTotalItems = directRedactedCountResult.firstOrNull() ?: 0 + directStructureCountResult.firstOrNull()!! - ?: 0 - val inDirectTotalItems = indirectCountResult.firstOrNull() ?: 0 + val directRedactedCount = directRedactedCountResult.firstOrNull() ?: 0f + val directStructureCount = directStructureCountResult.firstOrNull() ?: 0f + val directTotalItems = directRedactedCount + directStructureCount + val inDirectTotalItems = indirectCountResult.firstOrNull() ?: 0f val endTime = System.currentTimeMillis() val countsJson = JSONObject() - .put("direct_counts", directTotalItems) - .put("indirect_counts", inDirectTotalItems) + .put("invalid_message_direct_counts", directTotalItems) + .put("invalid_message_indirect_counts", inDirectTotalItems) .put("query_time_millis", endTime - startTime) return request @@ -790,6 +823,66 @@ class GetReportCountsFunction( .build() } + /** + * Get a rollup of the counts for the data stream provided. + * + * @param request HttpRequestMessage> + * @return HttpResponseMessage + */ + fun getRollupCounts(): HttpResponseMessage { + + val queryParams = prepareQueryParameters(request) + + // Verify the request is complete + checkRequiredCountsQueryParams( + queryParams["dataStreamId"], + queryParams["dataStreamRoute"], + queryParams["dateStart"], + queryParams["dateEnd"], + queryParams["daysInterval"], + true + )?.let { return it } + + val timeRangeWhereClause: String + try { + timeRangeWhereClause = buildSqlClauseForDateRange( + queryParams["daysInterval"], + queryParams["dateStart"], + queryParams["dateEnd"] + ) + } catch (e: Exception) { + logger.error(e.localizedMessage) + return request + .createResponseBuilder(HttpStatus.BAD_REQUEST) + .body(e.localizedMessage) + .build() + } + + val rollupCountsQuery = ( + "select " + + "r.content.schema_name, r.content.schema_version, count(r.stageName) as counts, r.stageName " + + "from $reportsContainerName r where r.dataStreamId = '${queryParams["dataStreamId"]}' and " + + "r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause " + + "group by r.stageName, r.content.schema_name, r.content.schema_version" + ) + + val rollupCountsResult = reportsContainer.queryItems( + rollupCountsQuery, CosmosQueryRequestOptions(), + StageCounts::class.java + ) + + val rollupCounts = mutableListOf() + if (rollupCountsResult.count() > 0) { + rollupCounts.addAll(rollupCountsResult.toList()) + } + + return request + .createResponseBuilder(HttpStatus.OK) + .header("Content-Type", "application/json") + .body(gson.toJson(rollupCounts)) + .build() + } + private fun prepareQueryParameters(request: HttpRequestMessage>): Map { val queryParams = request.queryParameters val dataStreamId = queryParams["data_stream_id"] diff --git a/processing-status-api-function-app/test/queries/README.md b/processing-status-api-function-app/test/queries/README.md new file mode 100644 index 00000000..511cbd1e --- /dev/null +++ b/processing-status-api-function-app/test/queries/README.md @@ -0,0 +1,269 @@ +# PS API Helpful Queries +Collection of helpful CosmosDB SQL queries. + +## Counts Queries + +### Report counts by schema name and stage name +```roomsql +select + count(1) as counts, r.content.schema_name, r.stageName + from Reports r where + r._ts >= 1713916800 and r._ts <= 1714003199 and r.dataStreamId = 'aims-celr' and r.dataStreamRoute = 'hl7' + group by r.content.schema_name, r.stageName +``` +Sample output: +```json +[ + { + "counts": 62, + "schema_name": "dex-file-copy", + "stageName": "dex-routing" + }, + { + "counts": 6357, + "schema_name": "DEX HL7v2 REDACTOR", + "stageName": "REDACTOR" + }, + { + "counts": 6113, + "schema_name": "upload", + "stageName": "dex-upload" + }, + { + "counts": 6089, + "schema_name": "DEX HL7v2 RECEIVER", + "stageName": "RECEIVER" + }, + { + "counts": 6417, + "schema_name": "DEX HL7v2 HL7-JSON-LAKE-TRANSFORMER", + "stageName": "HL7-JSON-LAKE-TRANSFORMER" + }, + { + "counts": 6381, + "schema_name": "DEX HL7v2 STRUCTURE-VALIDATOR", + "stageName": "STRUCTURE-VALIDATOR" + }, + { + "counts": 14527, + "schema_name": "dex-metadata-verify", + "stageName": "dex-metadata-verify" + }, + { + "counts": 6495, + "schema_name": "DEX HL7v2 LAKE-SEGMENTS-TRANSFORMER", + "stageName": "LAKE-SEGMENTS-TRANSFORMER" + }, + { + "counts": 7277, + "schema_name": "dex-file-copy", + "stageName": "dex-file-copy" + } +] +``` + +### Unique Upload IDs for "upload" +```roomsql +select count(unqiueUploadCounts) as uploadCounts + from (SELECT distinct r.uploadId FROM Reports r where + r.dataStreamId='aims-celr' and + r.dataStreamRoute='hl7' and + r.content.schema_name = 'upload' and + r._ts >= 1712620800 and r._ts <= 1712707199) as unqiueUploadCounts +``` +Sample output: +```json +[ + { + "uploadCounts": 12313 + } +] +``` + +### Check for duplicate uploadIds +```roomsql +select d.uploadId, d.tot_count from + (SELECT r.uploadId, count(1) as tot_count FROM r + where + r.dataStreamId='aims-celr' and + r.dataStreamRoute='hl7' and + r.content.schema_name = 'upload' and + r._ts >= 1712620800 and r._ts <= 1712707199 + group by r.uploadId) as d + where d.tot_count>1 +``` +> **_NOTE:_** If the system is operating as expected, this should return no results. However, if there is an issue you may see something like the following. +```json +[ + { + "uploadId": "4e8e2764b59abc45c6d8229ede879a06", + "tot_count": 2 + } +] +``` + +### Count of failed structure validations +```roomsql +select + count(not contains(upper(r.content.summary.current_status), 'VALID_MESSAGE') ? 1 : undefined) as invalid + from Reports r + where r.content.schema_name = 'DEX HL7v2 STRUCTURE-VALIDATOR' and + r._ts >= 1710302400 and r._ts <= 1710388799 +``` +Sample output: +```json +[ + { + "invalid": 791 + } +] +``` + +### Check for duplicate filenames +```roomsql +select d.filename, d.tot_count from( + SELECT r.content.metadata.filename, count(1) as tot_count + FROM Reports r + where + r.dataStreamId='aims-celr' and + r.dataStreamRoute='hl7' and + r.content.schema_name = 'upload' + group by r.content.metadata.filename) as d where d.tot_count>1 +``` +Sample output: +```json +[ + { + "filename": "InterPartner~CELR~DC~AIMSPlatform~Prod~Prod~20240308092005~STOP~CVS_COVID_DC_20231214085952.hl7(2).hl7", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "InterPartner~CELR~OK~AIMSPlatform~Prod~Prod~20240318010038~STOP~SouthWestern_2024-03-17_01:11:32.hl7", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 4 + }, + { + "filename": "10KB-test-file", + "tot_count": 3 + }, + { + "filename": "InterPartner~CELR~DC~AIMSPlatform~Prod~Prod~20240308092016~STOP~CVS_COVID_DC_20230702211312.hl7(2).hl7", + "tot_count": 2 + }, + { + "filename": "InterPartner~CELR~DC~AIMSPlatform~Prod~Prod~20240309071824~STOP~CVS_COVID_DC_20230629165030.hl7.hl7", + "tot_count": 2 + }, + { + "filename": "InterPartner~CELR~DC~AIMSPlatform~Prod~Prod~20240310072008~STOP~CVS_COVID_DC_20230629165246.hl7(9).hl7", + "tot_count": 2 + }, + { + "filename": "InterPartner~CELR~DC~AIMSPlatform~Prod~Prod~20240308091859~STOP~CVS_COVID_DC_20230919090015.hl7(22).hl7", + "tot_count": 2 + }, + { + "filename": "InterPartner~CELR~DC~AIMSPlatform~Prod~Prod~20240326080124~STOP~CVS_COVID_DC_20230417085826.hl7(43).hl7", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "InterPartner~CELR~DC~AIMSPlatform~Prod~Prod~20240309072154~STOP~CVS_COVID_DC_20240127085951.hl7(21).hl7", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + }, + { + "filename": "10KB-test-file", + "tot_count": 2 + } +] +``` +### Get rollup counts +```roomsql +select + r.content.schema_name, r.content.schema_version, count(r.stageName) as counts, r.stageName + from Reports r where r.dataStreamId = 'aims-celr' and r.dataStreamRoute = 'hl7' and r._ts >= 1714262400 and r._ts <= 1714348799 + group by r.stageName, r.content.schema_name, r.content.schema_version +``` +Sample output: +```json +[ + { + "schema_name": "DEX HL7v2 STRUCTURE-VALIDATOR", + "schema_version": "2.0.0", + "counts": 12757, + "stageName": "STRUCTURE-VALIDATOR" + }, + { + "schema_name": "DEX HL7v2 LAKE-SEGMENTS-TRANSFORMER", + "schema_version": "2.0.0", + "counts": 12624, + "stageName": "LAKE-SEGMENTS-TRANSFORMER" + }, + { + "schema_name": "DEX HL7v2 HL7-JSON-LAKE-TRANSFORMER", + "schema_version": "2.0.0", + "counts": 12624, + "stageName": "HL7-JSON-LAKE-TRANSFORMER" + }, + { + "schema_name": "DEX HL7v2 REDACTOR", + "schema_version": "2.0.0", + "counts": 12758, + "stageName": "REDACTOR" + }, + { + "schema_name": "DEX HL7v2 RECEIVER", + "schema_version": "2.0.0", + "counts": 11600, + "stageName": "RECEIVER" + }, + { + "schema_name": "upload", + "schema_version": "1.0", + "counts": 11598, + "stageName": "dex-upload" + } +] +``` \ No newline at end of file