Skip to content

Commit

Permalink
Added class and function headers
Browse files Browse the repository at this point in the history
Added missing stageName from dead-letter report
  • Loading branch information
manu-govind committed Jun 12, 2024
1 parent f89aa0d commit b389b09
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,24 @@ class ReportManager: KoinComponent {

throw BadStateException("Failed to create reportId = ${stageReport.reportId}, uploadId = $uploadId")
}
/**
* Creates a dead-letter report if there is a malformed data or missing required fields
*
* @param uploadId String
* @param dataStreamId String
* @param dataStreamRoute String
* @param stageName String
* @param contentType String
* @param content String
* @return String
* @throws BadStateException
*/

@Throws(BadStateException::class)
fun createDeadLetterReport(uploadId: String?,
dataStreamId: String?,
dataStreamRoute: String?,
stageName: String?,
dispositionType: DispositionType,
contentType: String?,
content: Any?,
Expand All @@ -261,6 +274,7 @@ class ReportManager: KoinComponent {
this.uploadId = uploadId
this.dataStreamId = dataStreamId
this.dataStreamRoute = dataStreamRoute
this.stageName= stageName
this.dispositionType= dispositionType.toString()
this.contentType = contentType
this.deadLetterReasons= deadLetterReasons
Expand Down Expand Up @@ -323,6 +337,10 @@ class ReportManager: KoinComponent {
throw BadStateException("Failed to create dead-letterReport reportId = ${deadLetterReport.reportId}, uploadId = $uploadId")
}

/**
* The function which calculates the interval after which the retry should occur
* @param attempt Int
*/
private fun getCalculatedRetryDuration(attempt: Int): Long {
return DEFAULT_RETRY_INTERVAL_MILLIS * (attempt + 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.time.LocalDateTime
import java.util.*

/**
* Report for a given stage.
* Dead-LetterReport when there is missing fields or malformed data.
*
* @property uploadId String?
* @property reportId String?
Expand Down Expand Up @@ -34,6 +34,9 @@ data class ReportDeadLetter(
@SerializedName("data_stream_route")
var dataStreamRoute: String? = null,

@SerializedName("stage_name")
var stageName: String? = null,

@SerializedName("disposition_type")
var dispositionType: String? = null,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import java.util.concurrent.TimeUnit

internal val LOGGER = KtorSimpleLogger("pstatus-report-sink")

/**
* Class which initializes configuration values
* @param config ApplicationConfig
*
*/
class AzureServiceBusConfiguration(config: ApplicationConfig) {
var connectionString: String = config.tryGetString("connection_string") ?: ""
var serviceBusNamespace: String = config.tryGetString("azure_servicebus_namespace") ?: ""
Expand Down Expand Up @@ -60,7 +65,12 @@ val AzureServiceBus = createApplicationPlugin(
.buildProcessorClient()
}

// handles received messages
/**
* Function which starts receiving messages from queues and topics
* @throws AmqpException
* @throws TransportException
* @throws Exception generic
*/
@Throws(InterruptedException::class)
fun receiveMessages() {
try {
Expand Down Expand Up @@ -99,6 +109,13 @@ val AzureServiceBus = createApplicationPlugin(
}
}

/**
* Function which processes the message received in the queue or topics
* @param context ServiceBusReceivedMessageContext
* @throws BadRequestException
* @throws IllegalArgumentException
* @throws Exception generic
*/
private fun processMessage(context: ServiceBusReceivedMessageContext) {
val message = context.message

Expand Down Expand Up @@ -127,6 +144,11 @@ private fun processMessage(context: ServiceBusReceivedMessageContext) {
}

}

/**
* Function to handle and process the error generated during the processing of messages from queue or topics
* @param context ServiceBusErrorContext
*/
private fun processError(context: ServiceBusErrorContext) {
System.out.printf(
"Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
Expand Down Expand Up @@ -160,6 +182,9 @@ private fun processError(context: ServiceBusErrorContext) {
}
}

/**
* The main application module which runs always
*/
fun Application.serviceBusModule() {
install(AzureServiceBus) {
// any additional configuration goes here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ class ServiceBusProcessor {

}

// Function to validate report
/**
* Function to validate report attributes for missing required fields
* @param createReportMessage CreateReportSBMessage
* @throws BadRequestException
*/
private fun validateReport(createReportMessage: CreateReportSBMessage) {
val missingFields = mutableListOf<String>()

Expand Down Expand Up @@ -131,6 +135,7 @@ class ServiceBusProcessor {
createReportMessage.uploadId,
createReportMessage.dataStreamId,
createReportMessage.dataStreamRoute,
createReportMessage.stageName,
createReportMessage.dispositionType,
createReportMessage.contentType,
createReportMessage.content,
Expand All @@ -141,6 +146,9 @@ class ServiceBusProcessor {
}
}

/** Function to check whether the value is null or empty based on its type
* @param value Any
*/
private fun isNullOrEmpty(value: Any?): Boolean {
return when (value) {
null -> true
Expand Down

0 comments on commit b389b09

Please sign in to comment.