Skip to content

io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation. #272

Open
@dpbadiola

Description

@dpbadiola

I have this Kafka topic named source which contains AVRO messages already derived from the schema registry on http://localhost:9080. Producing the messages was as (see code below) to which the messages can be viewed by the Kafka UI (through the schema registry), hence (as I believe) the schema registry is also working as expected.

ProducerRecord<String, AvroCustomMessage> record = new ProducerRecord<>("source", "test", customMessage);
kafkaTemplate.send(record);

However, when I am setting up a Sink PubSub Connector from the source topic to cps.topic.test using the connector below. Am getting these stacktrace

Connector config

{
  "name": "CONNECTOR_NAME"
  "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
  "headers.publish": true,
  "metadata.publish": true,
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "key.converter.schemas.enable": false,
  "key.converter.schemas.registry.url": "http://localhost:9081",
  "key.converter.enhanced.avro.schema.support": true,
  "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter.schemas.enable": false,
  "value.converter.schemas.registry.url":" http://localhost:9081",
  "value.converter.enhanced.avro.schema.support": true,
  "tasks.max": "1",
  "topics": "source",
  "cps.topic": "cps.topic.test",
  "cps.project": "cps.project.test",
  "cps.endpoint": "pubsub.googleapis.com:443",
  "gcp.credentials.file.path": "path/to/file.json"
}

Error

2023-06-15T14:19:03.967837839Z [2023-06-15 14:19:03,967] DEBUG [CONNECTOR_NAME|task-0] Flushing... (com.google.pubsub.kafka.sink.CloudPubSubSinkTask:329)
2023-06-15T14:19:03.967846553Z [2023-06-15 14:19:03,967] TRACE [CONNECTOR_NAME|task-0] Received flush for partition source-0 (com.google.pubsub.kafka.sink.CloudPubSubSinkTask:333)
2023-06-15T14:19:03.971307523Z [2023-06-15 14:19:03,968] ERROR [CONNECTOR_NAME|task-0] WorkerSinkTask{id=CONNECTOR_NAME-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:409)
2023-06-15T14:19:03.971340639Z java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971372375Z 	at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:347)
2023-06-15T14:19:03.971386314Z 	at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
2023-06-15T14:19:03.971393094Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
2023-06-15T14:19:03.971398732Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374)
2023-06-15T14:19:03.971404374Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218)
2023-06-15T14:19:03.971409965Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
2023-06-15T14:19:03.971414840Z 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
2023-06-15T14:19:03.971419642Z 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
2023-06-15T14:19:03.971424466Z 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-06-15T14:19:03.971429693Z 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-06-15T14:19:03.971434557Z 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-06-15T14:19:03.971439081Z 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-06-15T14:19:03.971443774Z 	at java.base/java.lang.Thread.run(Thread.java:829)
2023-06-15T14:19:03.971448940Z Caused by: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971454003Z 	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588)
2023-06-15T14:19:03.971477053Z 	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:547)
2023-06-15T14:19:03.971482768Z 	at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:113)
2023-06-15T14:19:03.971490433Z 	at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:66)
2023-06-15T14:19:03.971495828Z 	at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:345)
2023-06-15T14:19:03.971500928Z 	... 12 more
2023-06-15T14:19:03.971505860Z Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971511459Z 	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
2023-06-15T14:19:03.971516728Z 	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
2023-06-15T14:19:03.971557435Z 	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
2023-06-15T14:19:03.971571292Z 	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
2023-06-15T14:19:03.971576787Z 	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
2023-06-15T14:19:03.971581325Z 	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
2023-06-15T14:19:03.971648068Z 	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
2023-06-15T14:19:03.971654268Z 	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
2023-06-15T14:19:03.971659263Z 	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
2023-06-15T14:19:03.971663692Z 	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
2023-06-15T14:19:03.971668328Z 	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574)
2023-06-15T14:19:03.971687253Z 	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
2023-06-15T14:19:03.971693772Z 	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
2023-06-15T14:19:03.971696819Z 	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
2023-06-15T14:19:03.971700104Z 	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
2023-06-15T14:19:03.971704966Z 	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
2023-06-15T14:19:03.971710298Z 	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
2023-06-15T14:19:03.971715273Z 	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
2023-06-15T14:19:03.971732361Z 	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
2023-06-15T14:19:03.971740825Z 	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
2023-06-15T14:19:03.971744216Z 	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
2023-06-15T14:19:03.971747150Z 	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
2023-06-15T14:19:03.971750185Z 	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
2023-06-15T14:19:03.971753106Z 	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
2023-06-15T14:19:03.971756012Z 	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
2023-06-15T14:19:03.971758846Z 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-06-15T14:19:03.971761676Z 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-06-15T14:19:03.971764526Z 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
2023-06-15T14:19:03.971767486Z 	... 3 more
2023-06-15T14:19:03.971770336Z Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971773264Z 	at io.grpc.Status.asRuntimeException(Status.java:539)
2023-06-15T14:19:03.971776331Z 	... 20 more
2023-06-15T14:19:03.971780978Z 

What seems to be the issue why am getting the error message?

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions