Skip to content

Commit

Permalink
Merge pull request #6 from hyperschwartz/jschwartz/add-event-streams
Browse files Browse the repository at this point in the history
More event stream changes
  • Loading branch information
hyperschwartz authored Feb 2, 2022
2 parents e00a7e2 + f726952 commit f54e1b8
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import io.provenance.invoice.services.OnboardInvoiceRequest
import io.provenance.invoice.testhelpers.IntTestBase
import io.provenance.invoice.util.extension.toProtoAny
import io.provenance.invoice.util.extension.toUuid
import io.provenance.invoice.util.extension.parseUuid
import io.provenance.invoice.util.extension.typedUnpack
import io.provenance.metadata.v1.MsgWriteRecordRequest
import io.provenance.metadata.v1.MsgWriteScopeRequest
import io.provenance.metadata.v1.MsgWriteSessionRequest
import io.provenance.scope.util.MetadataAddress
import io.provenance.scope.util.toUuid
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import kotlin.test.assertEquals
Expand Down Expand Up @@ -45,7 +45,7 @@ class InvoiceServiceTest : IntTestBase() {
)
assertEquals(
expected = response.scopeGenerationDetail.writeScopeRequest.typedUnpack<MsgWriteScopeRequest>().scopeUuid.let { scopeUuid ->
MetadataAddress.forScope(scopeUuid.toUuid()).toString()
MetadataAddress.forScope(scopeUuid.parseUuid()).toString()
},
actual = response.payablesContractExecutionDetail.scopeId,
message = "Expected the scope id to be derived from the write scope request",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package io.provenance.invoice.components

import io.provenance.invoice.config.eventstream.EventStreamConstants
import io.provenance.invoice.config.eventstream.EventStreamProperties
import io.provenance.invoice.services.EventHandlerService
import io.provenance.invoice.util.eventstream.external.EventBatch
import io.provenance.invoice.util.eventstream.external.EventStreamFactory
import io.provenance.invoice.util.eventstream.external.EventStreamResponseObserver
import io.provenance.invoice.util.eventstream.external.StreamEvent
import io.provenance.invoice.util.extension.wrapList
import mu.KLogging
import org.springframework.data.redis.core.RedisTemplate
Expand All @@ -16,10 +16,11 @@ import java.util.concurrent.TimeUnit

@Component
class EventStreamConsumer(
private val lockRegistry: LockRegistry,
private val redisTemplate: RedisTemplate<String, Long>,
private val eventHandlerService: EventHandlerService,
private val eventStreamFactory: EventStreamFactory,
private val eventStreamProperties: EventStreamProperties,
private val lockRegistry: LockRegistry,
private val redisTemplate: RedisTemplate<String, Long>,
) {
private companion object : KLogging() {
private const val EVENT_STREAM_CONSUMER = "event-stream-consumer-invoice"
Expand All @@ -34,7 +35,7 @@ class EventStreamConsumer(
try {
val responseObserver = EventStreamResponseObserver<EventBatch> { batch ->
// Handle each observed event
batch.events.forEach(::handleEvent)
batch.events.forEach(eventHandlerService::handleEvent)
redisTemplate.opsForValue().set(EVENT_STREAM_CONSUMER_HEIGHT, batch.height)
logger.info("Processed events and established new height: ${batch.height}")
}
Expand All @@ -60,13 +61,4 @@ class EventStreamConsumer(
}
}
}

private fun handleEvent(event: StreamEvent) {
event.attributes.singleOrNull { it.key == "PAYABLE_REGISTERED" }?.also { attribute ->
logger.info("Found a live one: ${attribute.value}")
} ?: run {
logger.info("Not related")
}
//logger.info("We did it! We saw an event! Hash: ${event.txHash}, Type: ${event.eventType}, Attribute Count: ${event.attributes.size}, Height: ${event.height}")
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package io.provenance.invoice.config.provenance

import com.google.common.io.BaseEncoding
import io.provenance.client.PbClient
import io.provenance.scope.encryption.ecies.ECUtils
import io.provenance.scope.encryption.model.DirectKeyRef
import io.provenance.scope.encryption.model.KeyRef
import io.provenance.scope.objectstore.client.OsClient
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.security.KeyPair

@Configuration
@EnableConfigurationProperties(value = [ProvenanceProperties::class])
Expand All @@ -13,4 +19,23 @@ class ProvenanceConfig {
chainId = provenanceProperties.chainId,
channelUri = provenanceProperties.channelUri,
)

@Bean
fun objectStore(provenanceProperties: ProvenanceProperties): ObjectStore = ObjectStore(
osClient = OsClient(
uri = provenanceProperties.objectStoreUri,
deadlineMs = provenanceProperties.objectStoreTimeoutMs,
),
oracleCredentials = KeyPair(
ECUtils.convertBytesToPublicKey(BaseEncoding.base64().decode(provenanceProperties.oraclePublicKey)),
ECUtils.convertBytesToPrivateKey(BaseEncoding.base64().decode(provenanceProperties.oraclePrivateKey)),
)
)
}

data class ObjectStore(
val osClient: OsClient,
val oracleCredentials: KeyPair,
) {
val keyRef: KeyRef by lazy { DirectKeyRef(oracleCredentials.public, oracleCredentials.private) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ data class ProvenanceProperties(
val chainId: String,
val channelUri: URI,
val oraclePublicKey: String,
val oraclePrivateKey: String,
val objectStoreUri: URI,
val objectStoreTimeoutMs: Long,
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import io.provenance.invoice.InvoiceProtos.Invoice
import io.provenance.invoice.domain.entities.InvoiceRecord
import io.provenance.invoice.util.enums.InvoiceProcessingStatus
import io.provenance.invoice.util.extension.totalAmount
import io.provenance.metadata.v1.MsgWriteRecordRequest
import io.provenance.metadata.v1.MsgWriteScopeRequest
import io.provenance.metadata.v1.MsgWriteSessionRequest
import java.math.BigDecimal
import java.time.OffsetDateTime
import java.util.UUID
Expand All @@ -14,6 +17,9 @@ data class InvoiceDto(
val invoice: Invoice,
val processingStatus: InvoiceProcessingStatus,
val totalOwed: BigDecimal,
val writeScopeRequest: MsgWriteScopeRequest,
val writeSessionRequest: MsgWriteSessionRequest,
val writeRecordRequest: MsgWriteRecordRequest,
val created: OffsetDateTime,
val updated: OffsetDateTime?,
) {
Expand All @@ -23,6 +29,9 @@ data class InvoiceDto(
invoice = record.invoice,
processingStatus = record.processingStatus,
totalOwed = record.invoice.totalAmount(),
writeScopeRequest = record.writeScopeRequest,
writeSessionRequest = record.writeSessionRequest,
writeRecordRequest = record.writeRecordRequest,
created = record.createdTime,
updated = record.updatedTime,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ class InvoiceRepository {
fun findByUuid(uuid: UUID): Invoice = findByUuidOrNull(uuid)
?: throw ResourceNotFoundException("Failed to find invoice by uuid [$uuid]")

fun findDtoByUuidOrNull(uuid: UUID): InvoiceDto? = transaction { InvoiceRecord.findById(uuid)?.toDto() }

fun findDtoByUuid(uuid: UUID): InvoiceDto = findDtoByUuidOrNull(uuid)
?: throw ResourceNotFoundException("Failed to find invoice dto by uuid [$uuid]")

fun insert(
invoice: Invoice,
status: InvoiceProcessingStatus,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.provenance.invoice.services

import com.google.common.util.concurrent.FutureCallback
import com.google.common.util.concurrent.Futures
import io.provenance.invoice.AssetProtos
import io.provenance.invoice.AssetProtos.Asset
import io.provenance.invoice.config.provenance.ObjectStore
import io.provenance.invoice.repository.InvoiceRepository
import io.provenance.invoice.util.eventstream.external.StreamEvent
import io.provenance.invoice.util.extension.checkNotNull
import io.provenance.invoice.util.extension.parseUuid
import io.provenance.invoice.util.extension.unpackInvoice
import io.provenance.invoice.util.validation.InvoiceValidator
import io.provenance.scope.encryption.domain.inputstream.DIMEInputStream
import io.provenance.scope.sdk.extensions.resultHash
import mu.KLogging
import org.springframework.stereotype.Service
import java.util.concurrent.Executors

@Service
class EventHandlerService(
private val objectStore: ObjectStore,
private val invoiceRepository: InvoiceRepository,
) {

private companion object : KLogging() {
private const val PAYABLE_REGISTRATION_KEY: String = "PAYABLE_REGISTERED"
private const val ORACLE_APPROVED_KEY: String = "ORACLE_APPROVED"
private const val PAYMENT_MADE_KEY: String = "PAYMENT_MADE"
}

fun handleEvent(event: StreamEvent) {
val eventKeys = event.attributes.map { it.key }
try {
when {
PAYABLE_REGISTRATION_KEY in eventKeys -> handleInvoiceRegisteredEvent(event)
ORACLE_APPROVED_KEY in eventKeys -> handleOracleApprovedEvent(event)
PAYMENT_MADE_KEY in eventKeys -> handlePaymentMadeEvent(event)
}
} catch (e: Exception) {
logger.error("Failed to process event with hash [${event.txHash}] and type [${event.eventType}] at height [${event.height}]", e)
}
}

private fun handleInvoiceRegisteredEvent(event: StreamEvent) {
val invoiceUuid = event.attributeValue(PAYABLE_REGISTRATION_KEY)
.also { logger.info("Handling invoice registration event for invoice uuid [$it]") }
.parseUuid()
val invoiceDto = invoiceRepository.findDtoByUuid(invoiceUuid)
val assetHash = invoiceDto.writeRecordRequest.record.resultHash()
val getFuture = objectStore.osClient.get(
hash = assetHash,
publicKey = objectStore.oracleCredentials.public,
)
Futures.addCallback(getFuture, object: FutureCallback<DIMEInputStream> {
override fun onSuccess(result: DIMEInputStream?) {
result.checkNotNull { "Null DIMEInputStream received from object store query for invoice [$invoiceUuid]" }
.getDecryptedPayload(objectStore.keyRef).use { signatureStream ->
val messageBytes = signatureStream.readAllBytes()
val targetInvoice = Asset.parseFrom(messageBytes).unpackInvoice()
InvoiceValidator.validateInvoice(targetInvoice)
logger.info("Successfully validated invoice from object store [${targetInvoice.invoiceUuid.value}]")
}
}
override fun onFailure(t: Throwable) {
logger.error("Failed to receive invoice [$invoiceUuid] DIME stream from object store")
}
}, Executors.newSingleThreadExecutor())
}

private fun handleOracleApprovedEvent(event: StreamEvent) {
logger.info("Handling oracle approved event")
}

private fun handlePaymentMadeEvent(event: StreamEvent) {
logger.info("Handling payment made event")
}

private fun StreamEvent.attributeValueOrNull(key: String): String? = attributes.singleOrNull { it.key == key }?.value

private fun StreamEvent.attributeValue(key: String): String = attributeValueOrNull(key)
.checkNotNull { "Unable to find stream attribute with key [$key]" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import com.google.protobuf.Any
import io.provenance.invoice.InvoiceProtos.Invoice
import io.provenance.invoice.domain.wallet.WalletDetails
import io.provenance.scope.util.MetadataAddress
import io.provenance.scope.util.toUuid
import mu.KLogging
import org.springframework.stereotype.Service
import io.provenance.invoice.repository.InvoiceRepository
import io.provenance.invoice.util.enums.InvoiceProcessingStatus
import io.provenance.invoice.util.extension.toAsset
import io.provenance.invoice.util.extension.toProtoAny
import io.provenance.invoice.util.extension.toUuid
import io.provenance.invoice.util.extension.parseUuid
import io.provenance.invoice.util.validation.InvoiceValidator
import java.math.BigDecimal
import java.util.UUID
Expand Down Expand Up @@ -48,7 +48,7 @@ class InvoiceService(
invoice = upsertedInvoice.invoice,
payablesContractExecutionDetail = PayablesContractExecutionDetail(
payableUuid = upsertedInvoice.uuid,
scopeId = MetadataAddress.forScope(assetOnboardingResponse.writeScopeRequest.scopeUuid.toUuid()).toString(),
scopeId = MetadataAddress.forScope(assetOnboardingResponse.writeScopeRequest.scopeUuid.parseUuid()).toString(),
invoiceTotal = upsertedInvoice.totalOwed,
invoiceDenom = upsertedInvoice.invoice.paymentDenom,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.provenance.invoice.clients.OnboardingResponse
import io.provenance.invoice.util.extension.toJsonProvenance
import io.provenance.invoice.util.extension.toProtoAny
import io.provenance.invoice.util.extension.toUuid
import io.provenance.invoice.util.extension.parseUuid
import io.provenance.metadata.v1.MsgWriteRecordRequest
import io.provenance.metadata.v1.MsgWriteScopeRequest
import io.provenance.metadata.v1.MsgWriteSessionRequest
Expand All @@ -28,7 +29,6 @@ import io.provenance.scope.encryption.ecies.ProvenanceKeyGenerator
import io.provenance.scope.objectstore.client.SIGNATURE_PUBLIC_KEY_FIELD_NAME
import io.provenance.scope.util.MetadataAddress
import io.provenance.scope.util.toByteString
import io.provenance.scope.util.toUuid
import java.io.ByteArrayInputStream
import java.security.PublicKey
import java.util.Base64
Expand All @@ -38,8 +38,8 @@ import java.util.UUID
* Code is a hack to somewhat replicate the responses that service-asset-onboarding returns, without using object store.
*/
object AssetOnboardingMocker {
private val SCOPE_SPEC_UUID = "551b5eca-921d-4ba7-aded-3966b224f44b".toUuid()
private val CONTRACT_SPEC_UUID = "f97ecc5d-c580-478d-be02-6c1b0c32235f".toUuid()
private val SCOPE_SPEC_UUID = "551b5eca-921d-4ba7-aded-3966b224f44b".parseUuid()
private val CONTRACT_SPEC_UUID = "f97ecc5d-c580-478d-be02-6c1b0c32235f".parseUuid()
private const val RECORD_SPEC_NAME = "Asset"
private val ASSET_SPEC_INPUT = RecordInputSpec(
name = "AssetHash",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.provenance.invoice.util.extension

import java.time.OffsetDateTime
import java.util.UUID

fun String.toOffsetDateTime(): OffsetDateTime = OffsetDateTime.parse(this)
fun String.toOffsetDateTimeOrNull(): OffsetDateTime? = tryOrNull(::toOffsetDateTime)

fun String.parseUuidOrNull(): UUID? = tryOrNull { UUID.fromString(this) }
fun String.parseUuid(): UUID = parseUuidOrNull().checkNotNull { "Unable to parse value [$this] to a valid UUID" }
3 changes: 3 additions & 0 deletions server/src/main/resources/application-container.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ spring.redis.port=${REDIS_PORT}
provenance.chain_id=${PROVENANCE_CHAIN_ID}
provenance.channel_uri=${PROVENANCE_CHANNEL_URI}
provenance.oracle_public_key=${PROVENANCE_ORACLE_PUBLIC_KEY}
provenance.oracle_private_key=${PROVENANCE_ORACLE_PRIVATE_KEY}
provenance.object_store_uri=${OBJECT_STORE_URI}
provenance.object_store_timeout_ms=${OBJECT_STORE_TIMEOUT_MS}

# Event Stream
event.stream.websocket_uri=${EVENT_STREAM_WEBSOCKET_URI}
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/resources/application-development.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ spring.redis.port=6379
provenance.chain_id=localnet
provenance.channel_uri=http://localhost:9090
provenance.oracle_public_key=A6H7lr5aObpuSnHcisRcDDxsFAbmRXSvJIeErpEInUYX
provenance.oracle_private_key=nice-try-hackers
provenance.object_store_uri=grpc://localhost:8081
provenance.object_store_timeout_ms=20000

# Event Stream
event.stream.websocket_uri=ws://localhost:26657
Expand Down

0 comments on commit f54e1b8

Please sign in to comment.