Skip to content

Commit

Permalink
Paginate the counts results
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt B Krystof committed Mar 8, 2024
1 parent 2ab2105 commit 7332342
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import gov.cdc.ocio.processingstatusapi.model.reports.stagereports.HL7Validation
import gov.cdc.ocio.processingstatusapi.model.traces.*
import gov.cdc.ocio.processingstatusapi.utils.DateUtils
import gov.cdc.ocio.processingstatusapi.utils.JsonUtils
import gov.cdc.ocio.processingstatusapi.utils.PageUtils
import mu.KotlinLogging
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
Expand Down Expand Up @@ -219,6 +220,9 @@ class GetReportCountsFunction(
val dataStreamId = request.queryParameters["data_stream_id"]
val dataStreamRoute = request.queryParameters["data_stream_route"]

val pageSize = request.queryParameters["page_size"]
val pageNumber = request.queryParameters["page_number"]

val dateStart = request.queryParameters["date_start"]
val dateEnd = request.queryParameters["date_end"]

Expand All @@ -238,34 +242,49 @@ class GetReportCountsFunction(
.build()
}

val uploadIdsSqlQuery = StringBuilder()
uploadIdsSqlQuery.append(
"select "
+ "distinct value r.uploadId "
+ "from $reportsContainerName r "
+ "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute'"
)
val pageUtils = PageUtils.Builder()
.maxPageSize(500)
.defaultPageSize(100)
.build()

val pageSizeAsInt = try {
pageUtils.getPageSize(pageSize)
} catch (ex: BadRequestException) {
return request
.createResponseBuilder(HttpStatus.BAD_REQUEST)
.body(ex.localizedMessage)
.build()
}

// Get the total matching upload ids
val uploadIdCountSqlQuery = StringBuilder()
uploadIdCountSqlQuery.append(
"select "
+ "value count(1) "
+ "from (select distinct r.uploadId from $reportsContainerName r "
+ "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute')"
)
if (!daysInterval.isNullOrBlank() && (!dateStart.isNullOrBlank() || !dateEnd.isNullOrBlank())) {
return request
.createResponseBuilder(HttpStatus.BAD_REQUEST)
.body("date_interval and date_start/date_end can't be used simultaneously")
.build()
}

val timeRangeSqlPortion = StringBuilder()
if (!daysInterval.isNullOrBlank()) {
val dateStartEpochSecs = DateTime
.now(DateTimeZone.UTC)
.minusDays(Integer.parseInt(daysInterval))
.withTimeAtStartOfDay()
.toDate()
.time / 1000
uploadIdsSqlQuery.append(" and r._ts >= $dateStartEpochSecs")
timeRangeSqlPortion.append(" and r._ts >= $dateStartEpochSecs")
} else {
dateStart?.run {
try {
val dateStartEpochSecs = DateUtils.getEpochFromDateString(dateStart, "date_start")
uploadIdsSqlQuery.append(" and r._ts >= $dateStartEpochSecs")
timeRangeSqlPortion.append(" and r._ts >= $dateStartEpochSecs")
} catch (e: BadRequestException) {
logger.error(e.localizedMessage)
return request
Expand All @@ -277,7 +296,7 @@ class GetReportCountsFunction(
dateEnd?.run {
try {
val dateEndEpochSecs = DateUtils.getEpochFromDateString(dateEnd, "date_end")
uploadIdsSqlQuery.append(" and r._ts < $dateEndEpochSecs")
timeRangeSqlPortion.append(" and r._ts < $dateEndEpochSecs")
} catch (e: BadRequestException) {
logger.error(e.localizedMessage)
return request
Expand All @@ -287,83 +306,115 @@ class GetReportCountsFunction(
}
}
}
uploadIdCountSqlQuery.append(timeRangeSqlPortion)

// Get the matching uploadIds
val uploadIds = reportsContainer.queryItems(
uploadIdsSqlQuery.toString(), CosmosQueryRequestOptions(),
String::class.java
val uploadIdCountResult = reportsContainer.queryItems(
uploadIdCountSqlQuery.toString(), CosmosQueryRequestOptions(),
Long::class.java
)
val totalItems = uploadIdCountResult.first()
val numberOfPages: Int
val pageNumberAsInt: Int
val reportCountsList = mutableListOf<ReportCounts>()
if (totalItems > 0L) {
numberOfPages = (totalItems / pageSizeAsInt + if (totalItems % pageSizeAsInt > 0) 1 else 0).toInt()

pageNumberAsInt = try {
PageUtils.getPageNumber(pageNumber, numberOfPages)
} catch (ex: BadRequestException) {
return request
.createResponseBuilder(HttpStatus.BAD_REQUEST)
.body(ex.localizedMessage)
.build()
}
val offset = (pageNumberAsInt - 1) * pageSizeAsInt

if (uploadIds.count() > 0) {
val uploadIdsList = uploadIds.toList()
val quotedUploadIds = uploadIdsList.joinToString("\",\"", "\"", "\"")
val reportsSqlQuery = (
val uploadIdsSqlQuery = StringBuilder()
uploadIdsSqlQuery.append(
"select "
+ "r.uploadId, r.content.schema_name, r.content.schema_version, MIN(r.timestamp) as timestamp, count(r.stageName) as counts, r.stageName "
+ "from $reportsContainerName r where r.uploadId in ($quotedUploadIds) "
+ "group by r.uploadId, r.stageName, r.content.schema_name, r.content.schema_version"
+ "distinct value r.uploadId "
+ "from $reportsContainerName r "
+ "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' "
+ "$timeRangeSqlPortion offset $offset limit $pageSizeAsInt"
)
val reportItems = reportsContainer.queryItems(
reportsSqlQuery, CosmosQueryRequestOptions(),
StageCountsForUpload::class.java

// Get the matching uploadIds
val uploadIds = reportsContainer.queryItems(
uploadIdsSqlQuery.toString(), CosmosQueryRequestOptions(),
String::class.java
)

if (reportItems.count() > 0) {
val stageCountsByUploadId = mutableMapOf<String, MutableList<StageCounts>>()
val earliestTimestampByUploadId = mutableMapOf<String, Date>()
reportItems.forEach {
val list = stageCountsByUploadId[it.uploadId!!] ?: mutableListOf()
list.add(StageCounts().apply {
this.schema_name = it.schema_name
this.schema_version = it.schema_version
this.counts = it.counts
this.stageName = it.stageName
it.timestamp?.let { timestamp ->
val uploadId = it.uploadId
uploadId?.let {
var earliestTimestamp = earliestTimestampByUploadId[uploadId]
if (earliestTimestamp == null)
earliestTimestamp = timestamp
else if (timestamp.before(earliestTimestamp))
earliestTimestamp = timestamp
earliestTimestampByUploadId[uploadId] = earliestTimestamp
if (uploadIds.count() > 0) {
val uploadIdsList = uploadIds.toList()
val quotedUploadIds = uploadIdsList.joinToString("\",\"", "\"", "\"")
val reportsSqlQuery = (
"select "
+ "r.uploadId, r.content.schema_name, r.content.schema_version, MIN(r.timestamp) as timestamp, count(r.stageName) as counts, r.stageName "
+ "from $reportsContainerName r where r.uploadId in ($quotedUploadIds) "
+ "group by r.uploadId, r.stageName, r.content.schema_name, r.content.schema_version"
)
val reportItems = reportsContainer.queryItems(
reportsSqlQuery, CosmosQueryRequestOptions(),
StageCountsForUpload::class.java
)

if (reportItems.count() > 0) {
val stageCountsByUploadId = mutableMapOf<String, MutableList<StageCounts>>()
val earliestTimestampByUploadId = mutableMapOf<String, Date>()
reportItems.forEach {
val list = stageCountsByUploadId[it.uploadId!!] ?: mutableListOf()
list.add(StageCounts().apply {
this.schema_name = it.schema_name
this.schema_version = it.schema_version
this.counts = it.counts
this.stageName = it.stageName
it.timestamp?.let { timestamp ->
val uploadId = it.uploadId
uploadId?.let {
var earliestTimestamp = earliestTimestampByUploadId[uploadId]
if (earliestTimestamp == null)
earliestTimestamp = timestamp
else if (timestamp.before(earliestTimestamp))
earliestTimestamp = timestamp
earliestTimestampByUploadId[uploadId] = earliestTimestamp
}
}
}
})
stageCountsByUploadId[it.uploadId!!] = list
}
val revisedStageCountsByUploadId = getCounts(stageCountsByUploadId)

val reportCountsList = mutableListOf<ReportCounts>()
revisedStageCountsByUploadId.forEach { upload ->
val uploadId = upload.key
reportCountsList.add(ReportCounts().apply {
this.uploadId = uploadId
this.dataStreamId = dataStreamId
this.dataStreamRoute = dataStreamRoute
this.timestamp = earliestTimestampByUploadId[uploadId]
this.stages = upload.value
})
}

val aggregateReportCounts = AggregateReportCounts().apply {
this.summary = AggregateSummary().apply {
this.numUploads = reportCountsList.count()
})
stageCountsByUploadId[it.uploadId!!] = list
}
val revisedStageCountsByUploadId = getCounts(stageCountsByUploadId)

revisedStageCountsByUploadId.forEach { upload ->
val uploadId = upload.key
reportCountsList.add(ReportCounts().apply {
this.uploadId = uploadId
this.dataStreamId = dataStreamId
this.dataStreamRoute = dataStreamRoute
this.timestamp = earliestTimestampByUploadId[uploadId]
this.stages = upload.value
})
}
this.reportCountsList = reportCountsList
}
}
} else {
numberOfPages = 0
pageNumberAsInt = 0
}

return request
.createResponseBuilder(HttpStatus.OK)
.header("Content-Type", "application/json")
.body(gson.toJson(aggregateReportCounts))
.build()
val aggregateReportCounts = AggregateReportCounts().apply {
this.summary = PageSummary().apply {
this.pageNumber = pageNumberAsInt
this.numberOfPages = numberOfPages
this.pageSize = pageSizeAsInt
this.totalItems = totalItems
}
this.reportCountsList = reportCountsList
}

return request
.createResponseBuilder(HttpStatus.BAD_REQUEST)
.body("No results found")
.createResponseBuilder(HttpStatus.OK)
.header("Content-Type", "application/json")
.body(gson.toJson(aggregateReportCounts))
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import gov.cdc.ocio.processingstatusapi.model.UploadsStatus
import gov.cdc.ocio.processingstatusapi.model.reports.UploadCounts
import gov.cdc.ocio.processingstatusapi.utils.DateUtils
import gov.cdc.ocio.processingstatusapi.utils.JsonUtils
import gov.cdc.ocio.processingstatusapi.utils.PageUtils
import mu.KotlinLogging
import java.util.*

Expand Down Expand Up @@ -57,8 +58,14 @@ class GetUploadStatusFunction(
val sortBy = request.queryParameters["sort_by"]
val sortOrder = request.queryParameters["sort_order"]

val pageUtils = PageUtils.Builder()
.minPageSize(MIN_PAGE_SIZE)
.maxPageSize(MAX_PAGE_SIZE)
.defaultPageSize(DEFAULT_PAGE_SIZE)
.build()

val pageSizeAsInt = try {
getPageSize(pageSize)
pageUtils.getPageSize(pageSize)
} catch (ex: BadRequestException) {
return request
.createResponseBuilder(HttpStatus.BAD_REQUEST)
Expand Down Expand Up @@ -124,7 +131,7 @@ class GetUploadStatusFunction(
numberOfPages = (totalItems / pageSizeAsInt + if (totalItems % pageSizeAsInt > 0) 1 else 0).toInt()

pageNumberAsInt = try {
getPageNumber(pageNumber, numberOfPages)
PageUtils.getPageNumber(pageNumber, numberOfPages)
} catch (ex: BadRequestException) {
return request
.createResponseBuilder(HttpStatus.BAD_REQUEST)
Expand Down Expand Up @@ -208,67 +215,9 @@ class GetUploadStatusFunction(
.build()
}

/**
* Get page size if valid.
*
* @param pageSize String?
* @return Int
* @throws BadRequestException
*/
@Throws(BadRequestException::class)
private fun getPageSize(pageSize: String?) = run {
var pageSizeAsInt = DEFAULT_PAGE_SIZE
pageSize?.run {
var issue = false
try {
pageSizeAsInt = pageSize.toInt()
if (pageSizeAsInt < MIN_PAGE_SIZE || pageSizeAsInt > MAX_PAGE_SIZE)
issue = true
} catch (e: NumberFormatException) {
issue = true
}

if (issue) {
throw BadRequestException("\"page_size must be between $MIN_PAGE_SIZE and $MAX_PAGE_SIZE\"")
}
}
pageSizeAsInt
}

/**
* Get page number if valid.
*
* @param pageNumber String?
* @param numberOfPages Int
* @return Int
* @throws BadRequestException
*/
@Throws(BadRequestException::class)
private fun getPageNumber(pageNumber: String?, numberOfPages: Int) = run {
var pageNumberAsInt = DEFAULT_PAGE_NUMBER
pageNumber?.run {
var issue = false
try {
pageNumberAsInt = pageNumber.toInt()
if (pageNumberAsInt < MIN_PAGE_NUMBER || pageNumberAsInt > numberOfPages)
issue = true
} catch (e: NumberFormatException) {
issue = true
}

if (issue) {
throw BadRequestException("page_number must be between $MIN_PAGE_NUMBER and $numberOfPages")
}
}
pageNumberAsInt
}

companion object {
private const val MIN_PAGE_SIZE = 1
private const val MAX_PAGE_SIZE = 10000
private const val DEFAULT_PAGE_NUMBER = 1

private const val MIN_PAGE_NUMBER = 1
private const val DEFAULT_PAGE_SIZE = 100

private const val DEFAULT_SORT_ORDER = "asc"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package gov.cdc.ocio.processingstatusapi.model.reports

import com.google.gson.annotations.SerializedName
import gov.cdc.ocio.processingstatusapi.model.PageSummary

/**
* Aggregate report counts
*
* @property summary AggregateSummary?
* @property summary PageSummary?
* @property reportCountsList List<ReportCounts>?
* @constructor
*/
data class AggregateReportCounts(

@SerializedName("summary")
var summary: AggregateSummary? = null,
var summary: PageSummary? = null,

@SerializedName("uploads")
var reportCountsList: List<ReportCounts>? = null
Expand Down
Loading

0 comments on commit 7332342

Please sign in to comment.