Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation. #272

Open
dpbadiola opened this issue Jun 15, 2023 · 3 comments
Assignees

Comments

@dpbadiola
Copy link

dpbadiola commented Jun 15, 2023

I have this Kafka topic named source which contains AVRO messages already derived from the schema registry on http://localhost:9080. Producing the messages was as (see code below) to which the messages can be viewed by the Kafka UI (through the schema registry), hence (as I believe) the schema registry is also working as expected.

ProducerRecord<String, AvroCustomMessage> record = new ProducerRecord<>("source", "test", customMessage);
kafkaTemplate.send(record);

However, when I am setting up a Sink PubSub Connector from the source topic to cps.topic.test using the connector below. Am getting these stacktrace

Connector config

{
  "name": "CONNECTOR_NAME"
  "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
  "headers.publish": true,
  "metadata.publish": true,
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "key.converter.schemas.enable": false,
  "key.converter.schemas.registry.url": "http://localhost:9081",
  "key.converter.enhanced.avro.schema.support": true,
  "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter.schemas.enable": false,
  "value.converter.schemas.registry.url":" http://localhost:9081",
  "value.converter.enhanced.avro.schema.support": true,
  "tasks.max": "1",
  "topics": "source",
  "cps.topic": "cps.topic.test",
  "cps.project": "cps.project.test",
  "cps.endpoint": "pubsub.googleapis.com:443",
  "gcp.credentials.file.path": "path/to/file.json"
}

Error

2023-06-15T14:19:03.967837839Z [2023-06-15 14:19:03,967] DEBUG [CONNECTOR_NAME|task-0] Flushing... (com.google.pubsub.kafka.sink.CloudPubSubSinkTask:329)
2023-06-15T14:19:03.967846553Z [2023-06-15 14:19:03,967] TRACE [CONNECTOR_NAME|task-0] Received flush for partition source-0 (com.google.pubsub.kafka.sink.CloudPubSubSinkTask:333)
2023-06-15T14:19:03.971307523Z [2023-06-15 14:19:03,968] ERROR [CONNECTOR_NAME|task-0] WorkerSinkTask{id=CONNECTOR_NAME-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:409)
2023-06-15T14:19:03.971340639Z java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971372375Z 	at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:347)
2023-06-15T14:19:03.971386314Z 	at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
2023-06-15T14:19:03.971393094Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
2023-06-15T14:19:03.971398732Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374)
2023-06-15T14:19:03.971404374Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218)
2023-06-15T14:19:03.971409965Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
2023-06-15T14:19:03.971414840Z 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
2023-06-15T14:19:03.971419642Z 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
2023-06-15T14:19:03.971424466Z 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-06-15T14:19:03.971429693Z 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-06-15T14:19:03.971434557Z 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-06-15T14:19:03.971439081Z 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-06-15T14:19:03.971443774Z 	at java.base/java.lang.Thread.run(Thread.java:829)
2023-06-15T14:19:03.971448940Z Caused by: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971454003Z 	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588)
2023-06-15T14:19:03.971477053Z 	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:547)
2023-06-15T14:19:03.971482768Z 	at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:113)
2023-06-15T14:19:03.971490433Z 	at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:66)
2023-06-15T14:19:03.971495828Z 	at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:345)
2023-06-15T14:19:03.971500928Z 	... 12 more
2023-06-15T14:19:03.971505860Z Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971511459Z 	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
2023-06-15T14:19:03.971516728Z 	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
2023-06-15T14:19:03.971557435Z 	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
2023-06-15T14:19:03.971571292Z 	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
2023-06-15T14:19:03.971576787Z 	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
2023-06-15T14:19:03.971581325Z 	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
2023-06-15T14:19:03.971648068Z 	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
2023-06-15T14:19:03.971654268Z 	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
2023-06-15T14:19:03.971659263Z 	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
2023-06-15T14:19:03.971663692Z 	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
2023-06-15T14:19:03.971668328Z 	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574)
2023-06-15T14:19:03.971687253Z 	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
2023-06-15T14:19:03.971693772Z 	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
2023-06-15T14:19:03.971696819Z 	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
2023-06-15T14:19:03.971700104Z 	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
2023-06-15T14:19:03.971704966Z 	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
2023-06-15T14:19:03.971710298Z 	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
2023-06-15T14:19:03.971715273Z 	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
2023-06-15T14:19:03.971732361Z 	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
2023-06-15T14:19:03.971740825Z 	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
2023-06-15T14:19:03.971744216Z 	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
2023-06-15T14:19:03.971747150Z 	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
2023-06-15T14:19:03.971750185Z 	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
2023-06-15T14:19:03.971753106Z 	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
2023-06-15T14:19:03.971756012Z 	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
2023-06-15T14:19:03.971758846Z 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-06-15T14:19:03.971761676Z 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-06-15T14:19:03.971764526Z 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
2023-06-15T14:19:03.971767486Z 	... 3 more
2023-06-15T14:19:03.971770336Z Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971773264Z 	at io.grpc.Status.asRuntimeException(Status.java:539)
2023-06-15T14:19:03.971776331Z 	... 20 more
2023-06-15T14:19:03.971780978Z 

What seems to be the issue why am getting the error message?

@samarthsingal
Copy link
Contributor

We will need more details to be able to look into this for you - specifically we will need to look at the Pub/Sub topic schema and the sample message you are trying to send. If your schema/data is confidential, you can try to reproduce the error with non-confidential data.

@xsajkha
Copy link

xsajkha commented Apr 4, 2024

I run into similar issue. I have a avro schema for my kafka topic. I created the same schema for my pubsub topic (copied the schema from my kafka schema registry and used it as gcp pubsub schema for the pubsub topic). I tested the same message that is in the kafka topic directly by copying the same message and publish the message in gcp pubsub topic by using the gcp web console and the message is published.
However, sink connector (version 1.2.0) is failing...
Connector config:

{
	"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
	"errors.log.include.messages": "true",
	"tasks.max": "1",
	"topics": "klarna.reports",
	"cps.project": "mycps-project-id",
	"gcp.credentials.file.path": "******",
	"key.converter.schemas.enable": "false",
	"value.converter.schema.registry.url": "http://127.0.0.1:8081",
	"name": "pubsub-sink-klarna-reports",
	"value.converter": "io.confluent.connect.avro.AvroConverter",
	"cps.topic": "klarna.reports",
	"errors.log.enable": "true",
	"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}

Avro schema:

{
	"type": "record",
	"name": "KlarnaReport",
	"namespace": "se.org.klarna",
	"fields": [
		{
			"name": "applicantId",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "createdAt",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "accountInfosJson",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "balancesJson",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "balanceOverTimeJson",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "categorizedTransactionsJson",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "groupByCategoryJson",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "recurringTransactionsJson",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "accountJson",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "transactionsJson",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		},
		{
			"name": "source",
			"type": [
				"null",
				{
					"type": "string",
					"avro.java.string": "String"
				}
			],
			"default": null
		}
	]
}

Stacktrace:
{"timestamp":"2024-04-03 14:38:17,887","level":"ERROR","appName":"kafka-connect","thread":"task-thread-pubsub-sink-klarna-reports-0","class":"org.apache.kafka.connect.runtime.WorkerSinkTask","message":"WorkerSinkTask{id=pubsub-sink-klarna-reports-0} Commit of offsets threw an unexpected exception for sequence number 8: null java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation. at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:352) at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:378) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:208) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation. at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588) at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:547) at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:113) at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:66) at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:350) ... 11 more Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ... 3 more Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation. at io.grpc.Status.asRuntimeException(Status.java:539) ... 17 more "}

I have tested the connector and it is sinking the data into pubsub topic when pubsub topic does not use any schema. However, the kafka message payload is not stored in the Message Body(pubsub topic), rather all the fields in the message payload is exploded as attribute. .

@samarthsingal looking forward to your feedback!

@xsajkha
Copy link

xsajkha commented Apr 4, 2024

I have tested for a simple avro schema that I created for both my new test kafka topic and pubsub topic that is as follows:

{
  "type": "record",
  "name": "TesPubsubSchemaRecord",
  "namespace": "se.org.common",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "phone",
      "type": "string",
      "default": "123"
    },
    {
      "name": "lastname",
      "type": "string",
      "default": null
    }
  ]
}

I have validated the pubsub topic schema for following message payload that is validated successfully:

{
	"id": 66717451,
	"name": "velit et amet",
	"phone": "cupidatat deserunt nisi in",
	"lastname": "et ea do ad"
}

But getting the same Schema validation error in my sink connector

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants