diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt index f906c76..690655a 100644 --- a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt @@ -171,17 +171,23 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedSecurityException if the signature verification process throws a * SignatureException. */ - fun verifyUsingField(message: SignableMessageWrapper<*>): Boolean { + fun verifyUsingField(message: SignableMessageWrapper): T { check(this.canVerifyMessageSignatures()) { "This MessageSigner is not configured for verification, it can only be used for signing" } - val messageSignature = message.getSignature() ?: return false + val messageSignature = message.getSignature() ?: throw IllegalStateException( + "This message does not contain a signature" + ) messageSignature.mark() val signatureBytes = ByteArray(messageSignature.remaining()) messageSignature[signatureBytes] try { message.setSignature(null) - return this.verifySignatureBytes(signatureBytes, this.toByteBuffer(message)) + if(this.verifySignatureBytes(signatureBytes, this.toByteBuffer(message))) { + return message.message + } else { + throw VerificationException("Verification of message signing failed") + } } catch (e: SignatureException) { throw UncheckedSecurityException("Unable to verify message signature", e) } finally { @@ -190,14 +196,6 @@ class MessageSigner(properties: MessageSigningProperties) { } } - fun verifyUsingFieldThrowingException(message: SignableMessageWrapper): T { - if(this.verifyUsingField(message)) { - return message.message - } else { - throw VerificationException("Verification of message signing failed") - } - } - /** * Verifies the signature of the provided `consumerRecord` using the signature from the message header. * @@ -210,7 +208,7 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedSecurityException if the signature verification process throws a * SignatureException. */ - fun verifyUsingHeader(consumerRecord: ConsumerRecord): Boolean { + fun verifyUsingHeader(consumerRecord: ConsumerRecord): ConsumerRecord { check(this.canVerifyMessageSignatures()) { "This MessageSigner is not configured for verification, it can only be used for signing" } val header = consumerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE) @@ -223,23 +221,16 @@ class MessageSigner(properties: MessageSigningProperties) { try { consumerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE) val specificRecordBase: SpecificRecordBase = consumerRecord.value() - return this.verifySignatureBytes( - signatureBytes, - this.toByteBuffer(specificRecordBase) - ) + if(this.verifySignatureBytes(signatureBytes, this.toByteBuffer(specificRecordBase))) { + return consumerRecord + } else { + throw VerificationException("Verification of record signing failed") + } } catch (e: SignatureException) { throw UncheckedSecurityException("Unable to verify message signature", e) } } - fun verifyUsingHeaderThrowingException(consumerRecord: ConsumerRecord): ConsumerRecord { - if(this.verifyUsingHeader(consumerRecord)) { - return consumerRecord - } else { - throw VerificationException("Verification of record signing failed") - } - } - @Throws(SignatureException::class) private fun verifySignatureBytes(signatureBytes: ByteArray, messageByteBuffer: ByteBuffer?): Boolean { val messageBytes: ByteArray = if (this.stripAvroHeader) { diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt index a2a9434..2d58fba 100644 --- a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt @@ -12,9 +12,6 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.Header import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test -import org.mockito.kotlin.spy -import org.mockito.kotlin.times -import org.mockito.kotlin.verify import org.springframework.core.io.ClassPathResource import java.nio.ByteBuffer import java.nio.charset.StandardCharsets @@ -90,115 +87,89 @@ class MessageSignerTest { val signatureWasVerified = messageSigner.verifyUsingField(message) - assertThat(signatureWasVerified).isTrue() + assertThat(signatureWasVerified).isEqualTo(message.message) } @Test fun verifiesRecordsWithValidSignature() { val signedRecord = this.properlySignedRecord() - val signatureWasVerified: Boolean = messageSigner.verifyUsingHeader(signedRecord) + val result = messageSigner.verifyUsingHeader(signedRecord) - assertThat(signatureWasVerified).isTrue() - } - - @Test - fun verifiesRecordsWithValidSignatureThenPerformsAction() { - val signedRecord = this.properlySignedRecord() - val action = spy(object: Consumer> { - override fun accept(consumerRecord: ConsumerRecord) = println(consumerRecord) - }) - - messageSigner.doAfterVerifyUsingHeader(signedRecord, action) - - verify(action).accept(signedRecord) + assertThat(result).isEqualTo(signedRecord) } @Test fun doesNotVerifyMessagesWithoutSignature() { val messageWrapper = this.messageWrapper() - - val signatureWasVerified = messageSigner.verifyUsingField(messageWrapper) - - assertThat(signatureWasVerified).isFalse() - } - - @Test - fun doesNotVerifyRecordsWithoutSignature() { - val expectedMessage = "This ProducerRecord does not contain a signature header" - val consumerRecord = this.consumerRecord() + val expectedMessage = "This message does not contain a signature" val exception: Exception = org.junit.jupiter.api.Assertions.assertThrows( IllegalStateException::class.java ) { - messageSigner.verifyUsingHeader( - consumerRecord - ) + val message = messageSigner.verifyUsingField(messageWrapper) + assertThat(message).isNull() } + val actualMessage = exception.message assertThat(actualMessage).contains(expectedMessage) } @Test - fun doesNotVerifyRecordsWithoutSignatureThenDoesNotPerformAction() { + fun doesNotVerifyRecordsWithoutSignature() { val expectedMessage = "This ProducerRecord does not contain a signature header" val consumerRecord = this.consumerRecord() - val action = spy(object: Consumer> { - override fun accept(consumerRecord: ConsumerRecord) = println(consumerRecord) - }) val exception: Exception = org.junit.jupiter.api.Assertions.assertThrows( IllegalStateException::class.java ) { - messageSigner.doAfterVerifyUsingHeader(consumerRecord, action) + val record = messageSigner.verifyUsingHeader( + consumerRecord + ) + assertThat(record).isNull() } + val actualMessage = exception.message assertThat(actualMessage).contains(expectedMessage) - verify(action, times(0)).accept(consumerRecord) } @Test fun doesNotVerifyMessagesWithIncorrectSignature() { val randomSignature = this.randomSignature() val messageWrapper = this.messageWrapper(randomSignature) + val expectedMessage = "Verification of message signing failed" - val signatureWasVerified = messageSigner.verifyUsingField(messageWrapper) - - assertThat(signatureWasVerified).isFalse() - } - - @Test - fun doesNotVerifyRecordsWithIncorrectSignature() { - val consumerRecord = this.consumerRecord() - val randomSignature = this.randomSignature() - consumerRecord.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature) + val exception: Exception = org.junit.jupiter.api.Assertions.assertThrows( + VerificationException::class.java + ) { + val message = messageSigner.verifyUsingField(messageWrapper) + assertThat(message).isNull() + } - val signatureWasVerified = messageSigner.verifyUsingHeader(consumerRecord) + val actualMessage = exception.message - assertThat(signatureWasVerified).isFalse() + assertThat(actualMessage).contains(expectedMessage) } @Test - fun doesNotVerifyRecordsWithIncorrectSignatureAndDoesNotPerformAction() { + fun doesNotVerifyRecordsWithIncorrectSignature() { val consumerRecord = this.consumerRecord() val randomSignature = this.randomSignature() consumerRecord.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature) - val action = spy(object: Consumer> { - override fun accept(consumerRecord: ConsumerRecord) = println(consumerRecord) - }) val expectedMessage = "Verification of record signing failed" val exception: Exception = org.junit.jupiter.api.Assertions.assertThrows( VerificationException::class.java ) { - messageSigner.doAfterVerifyUsingHeader(consumerRecord, action) + val signatureWasVerified = messageSigner.verifyUsingHeader(consumerRecord) + assertThat(signatureWasVerified).isNull() } + val actualMessage = exception.message assertThat(actualMessage).contains(expectedMessage) - verify(action, times(0)).accept(consumerRecord) } @Test