Skip to content

Commit

Permalink
Dev feature rollup counts (#103)
Browse files Browse the repository at this point in the history
* Documented some helpful manual queries in a readme

* Initial version of submission rollup counts by stage endpoint

* Fixes to new counts endpoints

---------

Co-authored-by: Matt B Krystof <[email protected]>
  • Loading branch information
mkrystof and Matt B Krystof committed May 13, 2024
1 parent ccd799c commit 38e73b7
Show file tree
Hide file tree
Showing 3 changed files with 428 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,15 @@ public HttpResponseMessage getHL7DirectIndirectMessageCounts(
return new GetReportCountsFunction(request).getHL7DirectIndirectMessageCounts();
}

@FunctionName("GetUnfinishedUploadCounts")
@FunctionName("GetBadMetadataAndUnfinishedUploadCounts")
public HttpResponseMessage GetUnfinishedUploadCounts(
@HttpTrigger(
name = "req",
methods = {HttpMethod.GET},
route = "report/counts/totalUnfinishedUploads/{dataStreamId}",
route = "report/counts/badMetadataAndUnfinishedUploadCounts",
authLevel = AuthorizationLevel.ANONYMOUS
) HttpRequestMessage<Optional<String>> request,
@BindingName("dataStreamId") String dataStreamId) {
return new GetReportCountsFunction(request).getUnfinishedUploadCounts(dataStreamId);
) HttpRequestMessage<Optional<String>> request) {
return new GetReportCountsFunction(request).getBadMetadataAndUnfinishedUploadCounts();
}

@FunctionName("GetInvalidMessageCounts")
Expand All @@ -280,6 +279,17 @@ public HttpResponseMessage GetInvalidMessageCounts(
return new GetReportCountsFunction(request).getInvalidMessageCounts();
}

@FunctionName("GetRollupCounts")
public HttpResponseMessage getRollupCounts(
@HttpTrigger(
name = "req",
methods = {HttpMethod.GET},
route = "report/counts/submissions/rollup",
authLevel = AuthorizationLevel.ANONYMOUS
) HttpRequestMessage<Optional<String>> request) {
return new GetReportCountsFunction(request).getRollupCounts();
}

@FunctionName("GetStatusByUploadId")
public HttpResponseMessage getStatusByUploadId(
@HttpTrigger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ class GetReportCountsFunction(
val numCompletedUploadingSqlQuery = (
"select "
+ "value count(1) "
+ "from Reports r "
+ "from $reportsContainerName r "
+ "where r.content.schema_name = 'upload' and r.content['offset'] = r.content.size and "
+ "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause"
)
Expand All @@ -537,7 +537,7 @@ class GetReportCountsFunction(
val numUploadingSqlQuery = (
"select "
+ "value count(1) "
+ "from Reports r "
+ "from $reportsContainerName r "
+ "where r.content.schema_name = 'upload' and r.content['offset'] != r.content.size and "
+ "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause"
)
Expand All @@ -551,7 +551,7 @@ class GetReportCountsFunction(
val numFailedSqlQuery = (
"select "
+ "value count(1) "
+ "from Reports r "
+ "from $reportsContainerName r "
+ "where r.content.schema_name = 'dex-metadata-verify' and r.content.issues != null and "
+ "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause"
)
Expand Down Expand Up @@ -580,28 +580,30 @@ class GetReportCountsFunction(
}

/**
*Get direct and in-direct message counts
* Get direct and in-direct message counts
*
* * @return HttpResponseMessage
* @return HttpResponseMessage
*/
fun getHL7DirectIndirectMessageCounts(): HttpResponseMessage {

val queryParams = prepareQueryParameters(request)
//Verify the request is complete

// Verify the request is complete
checkRequiredCountsQueryParams(
queryParams?.get("dataStreamId"),
queryParams?.get("dataStreamRoute"),
queryParams?.get("dateStart"),
queryParams?.get("dateEnd"),
queryParams?.get("daysInterval"),
queryParams["dataStreamId"],
queryParams["dataStreamRoute"],
queryParams["dateStart"],
queryParams["dateEnd"],
queryParams["daysInterval"],
true
)?.let { return it }


val timeRangeWhereClause: String
try {
timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"),
queryParams?.get("dateStart"), queryParams?.get("dateEnd")
timeRangeWhereClause = buildSqlClauseForDateRange(
queryParams["daysInterval"],
queryParams["dateStart"],
queryParams["dateEnd"]
)
} catch (e: Exception) {
logger.error(e.localizedMessage)
Expand All @@ -613,14 +615,14 @@ class GetReportCountsFunction(

val directMessageQuery = (
"select value SUM(directCounts) "
+ " FROM (select value SUM(r.content.stage.report.number_of_messages) from Reports r "
+ " where r.content.schema_name = '${HL7Debatch.schemaDefinition.schemaName}' and "
+ " r.dataStreamId = '${queryParams?.get("dataStreamId")}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts"
+ "from (select value SUM(r.content.stage.report.number_of_messages) from $reportsContainerName r "
+ "where r.content.schema_name = '${HL7Debatch.schemaDefinition.schemaName}' and "
+ "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts"
)

val indirectMessageQuery = (
"select value count(redactedCount) from ( "
+ "select * from Reports r where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and "
+ "select * from $reportsContainerName r where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and "
+ "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as redactedCount"
)

Expand Down Expand Up @@ -652,25 +654,30 @@ class GetReportCountsFunction(
.build()
}


fun getUnfinishedUploadCounts (dataStreamId: String): HttpResponseMessage {
/**
* Get the number of uploads stopped due to metadata issues and also unfinished upload counts.
*
* @return HttpResponseMessage
*/
fun getBadMetadataAndUnfinishedUploadCounts(): HttpResponseMessage {
val queryParams = prepareQueryParameters(request)

//Verify the request is complete
// Verify the request is complete
checkRequiredCountsQueryParams(
dataStreamId,
queryParams?.get("dataStreamRoute"),
queryParams?.get("dateStart"),
queryParams?.get("dateEnd"),
queryParams?.get("daysInterval"),
queryParams["dataStreamId"],
queryParams["dataStreamRoute"],
queryParams["dateStart"],
queryParams["dateEnd"],
queryParams["daysInterval"],
true
)?.let { return it }


val timeRangeWhereClause: String
try {
timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"),
queryParams?.get("dateStart"), queryParams?.get("dateEnd")
timeRangeWhereClause = buildSqlClauseForDateRange(
queryParams["daysInterval"],
queryParams["dateStart"],
queryParams["dateEnd"]
)
} catch (e: Exception) {
logger.error(e.localizedMessage)
Expand All @@ -680,25 +687,42 @@ class GetReportCountsFunction(
.build()
}

val unfinishedCountsQuery = (
val startTime = System.currentTimeMillis()

val badMetadataCountsQuery = (
"select value count(1) "
+ " FROM Reports r where r.content.schema_name = 'dex-metadata-verify' and r.content.schema_name = 'upload' "
+ " and r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause"
+ "from $reportsContainerName r "
+ "where r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and "
+ "r.content.schema_name = 'dex-metadata-verify' and "
+ "ARRAY_LENGTH(r.content.issues) > 0 and $timeRangeWhereClause"
)

val startTime = System.currentTimeMillis()
val unfinishedUploadsCountsQuery = (
"select value count(1) "
+ "from $reportsContainerName r "
+ "where r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and "
+ "r.content.schema_name = 'upload' and "
+ "r.content['offset'] < r.content['size'] and $timeRangeWhereClause"
)

val directCountResult = reportsContainer.queryItems(
unfinishedCountsQuery, CosmosQueryRequestOptions(),
val badMetadataCountsResult = reportsContainer.queryItems(
badMetadataCountsQuery, CosmosQueryRequestOptions(),
Float::class.java
)

val badMetadataCount = badMetadataCountsResult.firstOrNull() ?: 0

val unfinishedTotalItems = directCountResult.firstOrNull() ?: 0
val unfinishedUploadsCountsResult = reportsContainer.queryItems(
unfinishedUploadsCountsQuery, CosmosQueryRequestOptions(),
Float::class.java
)

val unfinishedUploadsCount = unfinishedUploadsCountsResult.firstOrNull() ?: 0

val endTime = System.currentTimeMillis()
val countsJson = JSONObject()
.put("unfinished_upload_counts", unfinishedTotalItems)
.put("bad_metadata_count", badMetadataCount)
.put("unfinished_upload_count", unfinishedUploadsCount)
.put("query_time_millis", endTime - startTime)

return request
Expand All @@ -708,24 +732,32 @@ class GetReportCountsFunction(
.build()
}

/**
* Get the number of invalid messages count using two different methods.
*
* @return HttpResponseMessage
*/
fun getInvalidMessageCounts() : HttpResponseMessage {

val queryParams = prepareQueryParameters(request)

//Verify the request is complete
checkRequiredCountsQueryParams(
queryParams?.get("dataStreamId"),
queryParams?.get("dataStreamRoute"),
queryParams?.get("dateStart"),
queryParams?.get("dateEnd"),
queryParams?.get("daysInterval"),
queryParams["dataStreamId"],
queryParams["dataStreamRoute"],
queryParams["dateStart"],
queryParams["dateEnd"],
queryParams["daysInterval"],
true
)?.let { return it }


val timeRangeWhereClause: String
try {
timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"),
queryParams?.get("dateStart"), queryParams?.get("dateEnd")
timeRangeWhereClause = buildSqlClauseForDateRange(
queryParams["daysInterval"],
queryParams["dateStart"],
queryParams["dateEnd"]
)
} catch (e: Exception) {
logger.error(e.localizedMessage)
Expand All @@ -737,7 +769,7 @@ class GetReportCountsFunction(

val directInvalidMessageQuery = (
"select value SUM(directCounts) "
+ " FROM (select value SUM(r.content.stage.report.number_of_messages) from Reports r "
+ " FROM (select value SUM(r.content.stage.report.number_of_messages) from $reportsContainerName r "
+ " where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and "
+ " r.dataStreamId = '${queryParams?.get("dataStreamId")}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts"
)
Expand All @@ -752,7 +784,7 @@ class GetReportCountsFunction(

val indirectInvalidMessageQuery = (
"select value count(invalidCounts) from ( "
+ "select * from Reports r where r.content.schema_name != 'HL7-JSON-LAKE-TRANSFORMER' and "
+ "select * from $reportsContainerName r where r.content.schema_name != 'HL7-JSON-LAKE-TRANSFORMER' and "
+ "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as invalidCounts"
)

Expand All @@ -773,14 +805,15 @@ class GetReportCountsFunction(
Float::class.java
)

val directTotalItems = directRedactedCountResult.firstOrNull() ?: 0 + directStructureCountResult.firstOrNull()!!
?: 0
val inDirectTotalItems = indirectCountResult.firstOrNull() ?: 0
val directRedactedCount = directRedactedCountResult.firstOrNull() ?: 0f
val directStructureCount = directStructureCountResult.firstOrNull() ?: 0f
val directTotalItems = directRedactedCount + directStructureCount
val inDirectTotalItems = indirectCountResult.firstOrNull() ?: 0f

val endTime = System.currentTimeMillis()
val countsJson = JSONObject()
.put("direct_counts", directTotalItems)
.put("indirect_counts", inDirectTotalItems)
.put("invalid_message_direct_counts", directTotalItems)
.put("invalid_message_indirect_counts", inDirectTotalItems)
.put("query_time_millis", endTime - startTime)

return request
Expand All @@ -790,6 +823,66 @@ class GetReportCountsFunction(
.build()
}

/**
* Get a rollup of the counts for the data stream provided.
*
* @param request HttpRequestMessage<Optional<String>>
* @return HttpResponseMessage
*/
fun getRollupCounts(): HttpResponseMessage {

val queryParams = prepareQueryParameters(request)

// Verify the request is complete
checkRequiredCountsQueryParams(
queryParams["dataStreamId"],
queryParams["dataStreamRoute"],
queryParams["dateStart"],
queryParams["dateEnd"],
queryParams["daysInterval"],
true
)?.let { return it }

val timeRangeWhereClause: String
try {
timeRangeWhereClause = buildSqlClauseForDateRange(
queryParams["daysInterval"],
queryParams["dateStart"],
queryParams["dateEnd"]
)
} catch (e: Exception) {
logger.error(e.localizedMessage)
return request
.createResponseBuilder(HttpStatus.BAD_REQUEST)
.body(e.localizedMessage)
.build()
}

val rollupCountsQuery = (
"select "
+ "r.content.schema_name, r.content.schema_version, count(r.stageName) as counts, r.stageName "
+ "from $reportsContainerName r where r.dataStreamId = '${queryParams["dataStreamId"]}' and "
+ "r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause "
+ "group by r.stageName, r.content.schema_name, r.content.schema_version"
)

val rollupCountsResult = reportsContainer.queryItems(
rollupCountsQuery, CosmosQueryRequestOptions(),
StageCounts::class.java
)

val rollupCounts = mutableListOf<StageCounts>()
if (rollupCountsResult.count() > 0) {
rollupCounts.addAll(rollupCountsResult.toList())
}

return request
.createResponseBuilder(HttpStatus.OK)
.header("Content-Type", "application/json")
.body(gson.toJson(rollupCounts))
.build()
}

private fun prepareQueryParameters(request: HttpRequestMessage<Optional<String>>): Map<String, String?> {
val queryParams = request.queryParameters
val dataStreamId = queryParams["data_stream_id"]
Expand Down
Loading

0 comments on commit 38e73b7

Please sign in to comment.