Skip to content

Commit

Permalink
Merge pull request #8 from hyperschwartz/jschwartz/async-processing-f…
Browse files Browse the repository at this point in the history
…or-invoices-and-better-oracle-approval

Add coroutine launches for event processing and failure retries
  • Loading branch information
hyperschwartz authored Feb 9, 2022
2 parents a8bf8d9 + 688bb7a commit 342fef3
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 25 deletions.
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/Dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object Dependencies {
val CoroutinesCoreJvm = DependencySpec("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm", Versions.KotlinCoroutines)
val CoroutinesReactor = DependencySpec("org.jetbrains.kotlinx:kotlinx-coroutines-reactor", Versions.KotlinCoroutines)
val CoroutinesJdk8 = DependencySpec("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8", Versions.KotlinCoroutines)
val CoroutinesSLF4J = DependencySpec("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j", Versions.KotlinCoroutines)
}

// Spring
Expand Down
1 change: 1 addition & 0 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
Dependencies.Kotlin.CoroutinesCoreJvm,
Dependencies.Kotlin.CoroutinesJdk8,
Dependencies.Kotlin.CoroutinesReactor,
Dependencies.Kotlin.CoroutinesSLF4J,
Dependencies.Kotlin.Reflect,
Dependencies.Kotlin.StdLibJdk8,
Dependencies.KotlinLogging,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package io.provenance.invoice.components

import io.provenance.invoice.config.app.Qualifiers
import io.provenance.invoice.config.eventstream.EventStreamConstants
import io.provenance.invoice.config.eventstream.EventStreamProperties
import io.provenance.invoice.services.EventHandlerService
import io.provenance.invoice.util.coroutine.launchI
import io.provenance.invoice.util.eventstream.external.EventBatch
import io.provenance.invoice.util.eventstream.external.EventStreamFactory
import io.provenance.invoice.util.eventstream.external.EventStreamResponseObserver
import io.provenance.invoice.util.provenance.PayableContractKey
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import mu.KLogging
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.integration.support.locks.LockRegistry
import org.springframework.scheduling.annotation.Scheduled
Expand All @@ -19,6 +24,7 @@ class EventStreamConsumer(
private val eventHandlerService: EventHandlerService,
private val eventStreamFactory: EventStreamFactory,
private val eventStreamProperties: EventStreamProperties,
@Qualifier(Qualifiers.EVENT_STREAM_COROUTINE_SCOPE) private val eventStreamScope: CoroutineScope,
private val lockRegistry: LockRegistry,
private val redisTemplate: RedisTemplate<String, Long>,
) {
Expand All @@ -39,7 +45,14 @@ class EventStreamConsumer(
try {
val responseObserver = EventStreamResponseObserver<EventBatch> { batch ->
// Handle each observed event
batch.events.forEach(eventHandlerService::handleEvent)
runBlocking {
batch.events
// Asynchronously process events in batches to allow faster execution
.map { event -> eventStreamScope.launchI { eventHandlerService.handleEvent(event) } }
// Do not let the code proceed without rejoining the main thread, ensuring that all events
// are processed before moving to the next height
.forEach { it.join() }
}
redisTemplate.opsForValue().set(EVENT_STREAM_CONSUMER_HEIGHT, batch.height)
logger.info("Processed events and established new height: ${batch.height}")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.provenance.invoice.components

import io.provenance.invoice.config.app.Qualifiers
import io.provenance.invoice.config.app.ServiceProperties
import io.provenance.invoice.repository.FailedEventRepository
import io.provenance.invoice.services.EventHandlerService
import io.provenance.invoice.util.coroutine.launchI
import io.provenance.invoice.util.eventstream.external.RpcClient
import io.provenance.invoice.util.extension.toStreamEventI
import io.provenance.invoice.util.provenance.PayableContractKey
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import mu.KLogging
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit
Expand All @@ -19,6 +24,7 @@ import java.util.concurrent.TimeUnit
class FailStateRetryer(
private val eventHandlerService: EventHandlerService,
private val failedEventRepository: FailedEventRepository,
@Qualifier(Qualifiers.FAILURE_RETRY_COROUTINE_SCOPE) private val failureRetryScope: CoroutineScope,
private val rpcClient: RpcClient,
private val serviceProperties: ServiceProperties,
) {
Expand All @@ -41,23 +47,32 @@ class FailStateRetryer(
return
}
logger.info("$logPrefix Found [${failedEventTxHashes.size}] failed event(s) to retry")
failedEventTxHashes.forEach { txHash ->
logger.info("$logPrefix Fetching transaction details for event hash [$txHash]")
try {
rpcClient.getTransaction(txHash)
} catch (e: Exception) {
logger.error("$logPrefix Failed to fetch transaction for hash [$txHash]", e)
null
}?.also { txResponse ->
logger.info("$logPrefix Retrying event with hash [$txHash]")
txResponse
.result
?.txResult
?.events
?.filter { it.type == CONTRACT_EVENT_TYPE }
?.filter { it.attributes.any { attribute -> attribute.key in PayableContractKey.EVENT_KEYS_CONTRACT_NAMES } }
?.forEach { actionableEvent -> eventHandlerService.handleEvent(event = actionableEvent.toStreamEventI(txResponse.result), isRetry = true) }
}
// Block when launching async processes to ensure this function doesn't exit early. Spring will only spin up
// a new instance of this scheduled task once the previous one has completed
runBlocking {
failedEventTxHashes.map { txHash ->
// Launch a new scope per tx hash, allowing them to process asynchronously and hopefully execute more
// quickly than a sequential loop
failureRetryScope.launchI {
logger.info("$logPrefix Fetching transaction details for event hash [$txHash]")
try {
rpcClient.getTransaction(txHash)
} catch (e: Exception) {
logger.error("$logPrefix Failed to fetch transaction for hash [$txHash]", e)
null
}?.also { txResponse ->
logger.info("$logPrefix Retrying event with hash [$txHash]")
txResponse
.result
?.txResult
?.events
?.filter { it.type == CONTRACT_EVENT_TYPE }
?.filter { it.attributes.any { attribute -> attribute.key in PayableContractKey.EVENT_KEYS_CONTRACT_NAMES } }
?.forEach { actionableEvent -> eventHandlerService.handleEvent(event = actionableEvent.toStreamEventI(txResponse.result), isRetry = true) }
}
}
// Rejoin the main thread to ensure the coroutines don't exit before their completion
}.forEach { it.join() }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.provenance.invoice.config.app

import com.fasterxml.jackson.databind.ObjectMapper
import io.provenance.invoice.util.coroutine.CoroutineUtil
import kotlinx.coroutines.CoroutineScope
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand All @@ -12,4 +14,16 @@ class AppConfig {
@Primary
@Bean
fun objectMapper(): ObjectMapper = ConfigurationUtil.DEFAULT_OBJECT_MAPPER

@Bean(Qualifiers.EVENT_STREAM_COROUTINE_SCOPE)
fun eventStreamCoroutineScope(): CoroutineScope = CoroutineUtil.newSingletonScope(
scopeName = CoroutineScopeNames.EVENT_STREAM_SCOPE,
threadCount = 10,
)

@Bean(Qualifiers.FAILURE_RETRY_COROUTINE_SCOPE)
fun failureRetryCoroutineScope(): CoroutineScope = CoroutineUtil.newSingletonScope(
scopeName = CoroutineScopeNames.FAILURE_RETRY_SCOPE,
threadCount = 10,
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.provenance.invoice.config.app

object CoroutineScopeNames {
const val EVENT_STREAM_SCOPE = "event-stream-scope"
const val FAILURE_RETRY_SCOPE = "failure-retry-scope"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ package io.provenance.invoice.config.app

object Qualifiers {
const val ORACLE_ACCOUNT_DETAIL: String = "oracleAccountDetail"
const val EVENT_STREAM_COROUTINE_SCOPE: String = "eventStreamCoroutineScope"
const val FAILURE_RETRY_COROUTINE_SCOPE: String = "failureRetryCoroutineScope"
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.provenance.invoice.services

import com.google.common.util.concurrent.FutureCallback
import com.google.common.util.concurrent.Futures
import io.provenance.invoice.AssetProtos.Asset
import io.provenance.invoice.config.provenance.ObjectStore
import io.provenance.invoice.factory.InvoiceCalcFactory
Expand All @@ -12,11 +10,9 @@ import io.provenance.invoice.util.enums.ExpectedPayableType
import io.provenance.invoice.util.enums.InvoiceStatus
import io.provenance.invoice.util.eventstream.external.StreamEvent
import io.provenance.invoice.util.extension.attributeValueI
import io.provenance.invoice.util.extension.checkNotNullI
import io.provenance.invoice.util.extension.unpackInvoiceI
import io.provenance.invoice.util.provenance.PayableContractKey
import io.provenance.invoice.util.validation.InvoiceValidator
import io.provenance.scope.encryption.domain.inputstream.DIMEInputStream
import io.provenance.scope.sdk.extensions.resultHash
import mu.KLogging
import org.springframework.stereotype.Service
Expand Down Expand Up @@ -88,6 +84,7 @@ class EventHandlerService(
objectStore
.osClient
.get(hash = assetHash, publicKey = objectStore.oracleAccountDetail.publicKey)
// Timeout after two minutes of failure to retrieve object
.get(2, TimeUnit.MINUTES)
.getDecryptedPayload(objectStore.oracleAccountDetail.keyRef)
.use { signatureStream ->
Expand All @@ -112,7 +109,23 @@ class EventHandlerService(
}

fun handleOracleApprovedEvent(event: IncomingInvoiceEvent) {
logger.info("Handling oracle approved event")
val logPrefix = "ORACLE APPROVAL [${event.invoiceUuid}]:"
logger.info("$logPrefix Handling oracle approved event")
when (val status = invoiceRepository.findDtoByUuid(event.invoiceUuid).status) {
InvoiceStatus.APPROVED -> {
logger.info("Skipping duplicate approval for invoice [${event.invoiceUuid}] and event hash [${event.streamEvent.txHash}]")
return
}
InvoiceStatus.REJECTED -> {
logger.error("Received oracle approval for rejected invoice [${event.invoiceUuid}] and event hash [${event.streamEvent.txHash}]. Skipping processing")
return
}
else -> {
logger.info("Processing oracle approval for invoice [${event.invoiceUuid}] with status [$status] and event hash [${event.streamEvent.txHash}]")
}
}
invoiceRepository.update(uuid = event.invoiceUuid, status = InvoiceStatus.APPROVED)
logger.info("$logPrefix Successfully marked invoice as oracle approved")
}

fun handlePaymentMadeEvent(event: IncomingInvoiceEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ class ProvenanceQueryService(
.checkNotNullI { "$logPrefix Null response received from oracle approval transaction" }
.txResponse
check(response.code == 0) { "$logPrefix Oracle approval transaction failed. Marking invoice as failed. Error log from Provenance: ${response.rawLog}" }
logger.info("$logPrefix Oracle approval transaction succeeded. Marking invoice as approved")
invoiceRepository.update(uuid = invoiceUuid, status = InvoiceStatus.APPROVED)
logger.info("$logPrefix Oracle approval transaction succeeded")
} catch (e: Exception) {
approvalFailure(invoiceUuid)
throw OracleApprovalException("Oracle approval transaction failed to process", e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.provenance.invoice.util.coroutine

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.slf4j.MDCContext
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

fun CoroutineScope.launchI(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit,
): Job = launch(context + MDCContext(), start, block)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.provenance.invoice.util.coroutine

import io.micrometer.core.instrument.util.NamedThreadFactory
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import mu.KotlinLogging
import java.util.concurrent.Executors

object CoroutineUtil {
private val logger = KotlinLogging.logger {}

fun newSingletonScope(
scopeName: String,
threadCount: Int,
) : CoroutineScope = Executors.newFixedThreadPool(threadCount, NamedThreadFactory(scopeName))
.asCoroutineDispatcher()
.plus(CoroutineExceptionHandler { _, throwable -> logger.error("Coroutine execution failed", throwable) })
.plus(SupervisorJob())
.let(::CoroutineScope)
}

0 comments on commit 342fef3

Please sign in to comment.