Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev feature rollup counts #103

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,15 @@ public HttpResponseMessage getHL7DirectIndirectMessageCounts(
return new GetReportCountsFunction(request).getHL7DirectIndirectMessageCounts();
}

@FunctionName("GetUnfinishedUploadCounts")
@FunctionName("GetBadMetadataAndUnfinishedUploadCounts")
public HttpResponseMessage GetUnfinishedUploadCounts(
@HttpTrigger(
name = "req",
methods = {HttpMethod.GET},
route = "report/counts/totalUnfinishedUploads/{dataStreamId}",
route = "report/counts/badMetadataAndUnfinishedUploadCounts",
authLevel = AuthorizationLevel.ANONYMOUS
) HttpRequestMessage<Optional<String>> request,
@BindingName("dataStreamId") String dataStreamId) {
return new GetReportCountsFunction(request).getUnfinishedUploadCounts(dataStreamId);
) HttpRequestMessage<Optional<String>> request) {
return new GetReportCountsFunction(request).getBadMetadataAndUnfinishedUploadCounts();
}

@FunctionName("GetInvalidMessageCounts")
Expand All @@ -280,6 +279,17 @@ public HttpResponseMessage GetInvalidMessageCounts(
return new GetReportCountsFunction(request).getInvalidMessageCounts();
}

@FunctionName("GetRollupCounts")
public HttpResponseMessage getRollupCounts(
@HttpTrigger(
name = "req",
methods = {HttpMethod.GET},
route = "report/counts/submissions/rollup",
authLevel = AuthorizationLevel.ANONYMOUS
) HttpRequestMessage<Optional<String>> request) {
return new GetReportCountsFunction(request).getRollupCounts();
}

@FunctionName("GetStatusByUploadId")
public HttpResponseMessage getStatusByUploadId(
@HttpTrigger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ class GetReportCountsFunction(
val numCompletedUploadingSqlQuery = (
"select "
+ "value count(1) "
+ "from Reports r "
+ "from $reportsContainerName r "
+ "where r.content.schema_name = 'upload' and r.content['offset'] = r.content.size and "
+ "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause"
)
Expand All @@ -537,7 +537,7 @@ class GetReportCountsFunction(
val numUploadingSqlQuery = (
"select "
+ "value count(1) "
+ "from Reports r "
+ "from $reportsContainerName r "
+ "where r.content.schema_name = 'upload' and r.content['offset'] != r.content.size and "
+ "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause"
)
Expand All @@ -551,7 +551,7 @@ class GetReportCountsFunction(
val numFailedSqlQuery = (
"select "
+ "value count(1) "
+ "from Reports r "
+ "from $reportsContainerName r "
+ "where r.content.schema_name = 'dex-metadata-verify' and r.content.issues != null and "
+ "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and $timeRangeWhereClause"
)
Expand Down Expand Up @@ -580,28 +580,30 @@ class GetReportCountsFunction(
}

/**
*Get direct and in-direct message counts
* Get direct and in-direct message counts
*
* * @return HttpResponseMessage
* @return HttpResponseMessage
*/
fun getHL7DirectIndirectMessageCounts(): HttpResponseMessage {

val queryParams = prepareQueryParameters(request)
//Verify the request is complete

// Verify the request is complete
checkRequiredCountsQueryParams(
queryParams?.get("dataStreamId"),
queryParams?.get("dataStreamRoute"),
queryParams?.get("dateStart"),
queryParams?.get("dateEnd"),
queryParams?.get("daysInterval"),
queryParams["dataStreamId"],
queryParams["dataStreamRoute"],
queryParams["dateStart"],
queryParams["dateEnd"],
queryParams["daysInterval"],
true
)?.let { return it }


val timeRangeWhereClause: String
try {
timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"),
queryParams?.get("dateStart"), queryParams?.get("dateEnd")
timeRangeWhereClause = buildSqlClauseForDateRange(
queryParams["daysInterval"],
queryParams["dateStart"],
queryParams["dateEnd"]
)
} catch (e: Exception) {
logger.error(e.localizedMessage)
Expand All @@ -613,14 +615,14 @@ class GetReportCountsFunction(

val directMessageQuery = (
"select value SUM(directCounts) "
+ " FROM (select value SUM(r.content.stage.report.number_of_messages) from Reports r "
+ " where r.content.schema_name = '${HL7Debatch.schemaDefinition.schemaName}' and "
+ " r.dataStreamId = '${queryParams?.get("dataStreamId")}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts"
+ "from (select value SUM(r.content.stage.report.number_of_messages) from $reportsContainerName r "
+ "where r.content.schema_name = '${HL7Debatch.schemaDefinition.schemaName}' and "
+ "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts"
)

val indirectMessageQuery = (
"select value count(redactedCount) from ( "
+ "select * from Reports r where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and "
+ "select * from $reportsContainerName r where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and "
+ "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as redactedCount"
)

Expand Down Expand Up @@ -652,25 +654,30 @@ class GetReportCountsFunction(
.build()
}


fun getUnfinishedUploadCounts (dataStreamId: String): HttpResponseMessage {
/**
* Get the number of uploads stopped due to metadata issues and also unfinished upload counts.
*
* @return HttpResponseMessage
*/
fun getBadMetadataAndUnfinishedUploadCounts(): HttpResponseMessage {
val queryParams = prepareQueryParameters(request)

//Verify the request is complete
// Verify the request is complete
checkRequiredCountsQueryParams(
dataStreamId,
queryParams?.get("dataStreamRoute"),
queryParams?.get("dateStart"),
queryParams?.get("dateEnd"),
queryParams?.get("daysInterval"),
queryParams["dataStreamId"],
queryParams["dataStreamRoute"],
queryParams["dateStart"],
queryParams["dateEnd"],
queryParams["daysInterval"],
true
)?.let { return it }


val timeRangeWhereClause: String
try {
timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"),
queryParams?.get("dateStart"), queryParams?.get("dateEnd")
timeRangeWhereClause = buildSqlClauseForDateRange(
queryParams["daysInterval"],
queryParams["dateStart"],
queryParams["dateEnd"]
)
} catch (e: Exception) {
logger.error(e.localizedMessage)
Expand All @@ -680,25 +687,42 @@ class GetReportCountsFunction(
.build()
}

val unfinishedCountsQuery = (
val startTime = System.currentTimeMillis()

val badMetadataCountsQuery = (
"select value count(1) "
+ " FROM Reports r where r.content.schema_name = 'dex-metadata-verify' and r.content.schema_name = 'upload' "
+ " and r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause"
+ "from $reportsContainerName r "
+ "where r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and "
+ "r.content.schema_name = 'dex-metadata-verify' and "
+ "ARRAY_LENGTH(r.content.issues) > 0 and $timeRangeWhereClause"
)

val startTime = System.currentTimeMillis()
val unfinishedUploadsCountsQuery = (
"select value count(1) "
+ "from $reportsContainerName r "
+ "where r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and "
+ "r.content.schema_name = 'upload' and "
+ "r.content['offset'] < r.content['size'] and $timeRangeWhereClause"
)

val directCountResult = reportsContainer.queryItems(
unfinishedCountsQuery, CosmosQueryRequestOptions(),
val badMetadataCountsResult = reportsContainer.queryItems(
badMetadataCountsQuery, CosmosQueryRequestOptions(),
Float::class.java
)

val badMetadataCount = badMetadataCountsResult.firstOrNull() ?: 0

val unfinishedTotalItems = directCountResult.firstOrNull() ?: 0
val unfinishedUploadsCountsResult = reportsContainer.queryItems(
unfinishedUploadsCountsQuery, CosmosQueryRequestOptions(),
Float::class.java
)

val unfinishedUploadsCount = unfinishedUploadsCountsResult.firstOrNull() ?: 0

val endTime = System.currentTimeMillis()
val countsJson = JSONObject()
.put("unfinished_upload_counts", unfinishedTotalItems)
.put("bad_metadata_count", badMetadataCount)
.put("unfinished_upload_count", unfinishedUploadsCount)
.put("query_time_millis", endTime - startTime)

return request
Expand All @@ -708,24 +732,32 @@ class GetReportCountsFunction(
.build()
}

/**
* Get the number of invalid messages count using two different methods.
*
* @return HttpResponseMessage
*/
fun getInvalidMessageCounts() : HttpResponseMessage {

val queryParams = prepareQueryParameters(request)

//Verify the request is complete
checkRequiredCountsQueryParams(
queryParams?.get("dataStreamId"),
queryParams?.get("dataStreamRoute"),
queryParams?.get("dateStart"),
queryParams?.get("dateEnd"),
queryParams?.get("daysInterval"),
queryParams["dataStreamId"],
queryParams["dataStreamRoute"],
queryParams["dateStart"],
queryParams["dateEnd"],
queryParams["daysInterval"],
true
)?.let { return it }


val timeRangeWhereClause: String
try {
timeRangeWhereClause = buildSqlClauseForDateRange(queryParams?.get("daysInterval"),
queryParams?.get("dateStart"), queryParams?.get("dateEnd")
timeRangeWhereClause = buildSqlClauseForDateRange(
queryParams["daysInterval"],
queryParams["dateStart"],
queryParams["dateEnd"]
)
} catch (e: Exception) {
logger.error(e.localizedMessage)
Expand All @@ -737,7 +769,7 @@ class GetReportCountsFunction(

val directInvalidMessageQuery = (
"select value SUM(directCounts) "
+ " FROM (select value SUM(r.content.stage.report.number_of_messages) from Reports r "
+ " FROM (select value SUM(r.content.stage.report.number_of_messages) from $reportsContainerName r "
+ " where r.content.schema_name = '${HL7Redactor.schemaDefinition.schemaName}' and "
+ " r.dataStreamId = '${queryParams?.get("dataStreamId")}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as directCounts"
)
Expand All @@ -752,7 +784,7 @@ class GetReportCountsFunction(

val indirectInvalidMessageQuery = (
"select value count(invalidCounts) from ( "
+ "select * from Reports r where r.content.schema_name != 'HL7-JSON-LAKE-TRANSFORMER' and "
+ "select * from $reportsContainerName r where r.content.schema_name != 'HL7-JSON-LAKE-TRANSFORMER' and "
+ "r.dataStreamId = '${queryParams["dataStreamId"]}' and r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause) as invalidCounts"
)

Expand All @@ -773,14 +805,15 @@ class GetReportCountsFunction(
Float::class.java
)

val directTotalItems = directRedactedCountResult.firstOrNull() ?: 0 + directStructureCountResult.firstOrNull()!!
?: 0
val inDirectTotalItems = indirectCountResult.firstOrNull() ?: 0
val directRedactedCount = directRedactedCountResult.firstOrNull() ?: 0f
val directStructureCount = directStructureCountResult.firstOrNull() ?: 0f
val directTotalItems = directRedactedCount + directStructureCount
val inDirectTotalItems = indirectCountResult.firstOrNull() ?: 0f

val endTime = System.currentTimeMillis()
val countsJson = JSONObject()
.put("direct_counts", directTotalItems)
.put("indirect_counts", inDirectTotalItems)
.put("invalid_message_direct_counts", directTotalItems)
.put("invalid_message_indirect_counts", inDirectTotalItems)
.put("query_time_millis", endTime - startTime)

return request
Expand All @@ -790,6 +823,66 @@ class GetReportCountsFunction(
.build()
}

/**
* Get a rollup of the counts for the data stream provided.
*
* @param request HttpRequestMessage<Optional<String>>
* @return HttpResponseMessage
*/
fun getRollupCounts(): HttpResponseMessage {

val queryParams = prepareQueryParameters(request)

// Verify the request is complete
checkRequiredCountsQueryParams(
queryParams["dataStreamId"],
queryParams["dataStreamRoute"],
queryParams["dateStart"],
queryParams["dateEnd"],
queryParams["daysInterval"],
true
)?.let { return it }

val timeRangeWhereClause: String
try {
timeRangeWhereClause = buildSqlClauseForDateRange(
queryParams["daysInterval"],
queryParams["dateStart"],
queryParams["dateEnd"]
)
} catch (e: Exception) {
logger.error(e.localizedMessage)
return request
.createResponseBuilder(HttpStatus.BAD_REQUEST)
.body(e.localizedMessage)
.build()
}

val rollupCountsQuery = (
"select "
+ "r.content.schema_name, r.content.schema_version, count(r.stageName) as counts, r.stageName "
+ "from $reportsContainerName r where r.dataStreamId = '${queryParams["dataStreamId"]}' and "
+ "r.dataStreamRoute = '${queryParams["dataStreamRoute"]}' and $timeRangeWhereClause "
+ "group by r.stageName, r.content.schema_name, r.content.schema_version"
)

val rollupCountsResult = reportsContainer.queryItems(
rollupCountsQuery, CosmosQueryRequestOptions(),
StageCounts::class.java
)

val rollupCounts = mutableListOf<StageCounts>()
if (rollupCountsResult.count() > 0) {
rollupCounts.addAll(rollupCountsResult.toList())
}

return request
.createResponseBuilder(HttpStatus.OK)
.header("Content-Type", "application/json")
.body(gson.toJson(rollupCounts))
.build()
}

private fun prepareQueryParameters(request: HttpRequestMessage<Optional<String>>): Map<String, String?> {
val queryParams = request.queryParameters
val dataStreamId = queryParams["data_stream_id"]
Expand Down
Loading
Loading