Open
Description
I have this Kafka topic named source
which contains AVRO messages already derived from the schema registry on http://localhost:9080
. Producing the messages was as (see code below) to which the messages can be viewed by the Kafka UI (through the schema registry), hence (as I believe) the schema registry is also working as expected.
ProducerRecord<String, AvroCustomMessage> record = new ProducerRecord<>("source", "test", customMessage);
kafkaTemplate.send(record);
However, when I am setting up a Sink PubSub Connector from the source
topic to cps.topic.test
using the connector below. Am getting these stacktrace
Connector config
{
"name": "CONNECTOR_NAME"
"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
"headers.publish": true,
"metadata.publish": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"key.converter.schemas.registry.url": "http://localhost:9081",
"key.converter.enhanced.avro.schema.support": true,
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter.schemas.enable": false,
"value.converter.schemas.registry.url":" http://localhost:9081",
"value.converter.enhanced.avro.schema.support": true,
"tasks.max": "1",
"topics": "source",
"cps.topic": "cps.topic.test",
"cps.project": "cps.project.test",
"cps.endpoint": "pubsub.googleapis.com:443",
"gcp.credentials.file.path": "path/to/file.json"
}
Error
2023-06-15T14:19:03.967837839Z [2023-06-15 14:19:03,967] DEBUG [CONNECTOR_NAME|task-0] Flushing... (com.google.pubsub.kafka.sink.CloudPubSubSinkTask:329)
2023-06-15T14:19:03.967846553Z [2023-06-15 14:19:03,967] TRACE [CONNECTOR_NAME|task-0] Received flush for partition source-0 (com.google.pubsub.kafka.sink.CloudPubSubSinkTask:333)
2023-06-15T14:19:03.971307523Z [2023-06-15 14:19:03,968] ERROR [CONNECTOR_NAME|task-0] WorkerSinkTask{id=CONNECTOR_NAME-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:409)
2023-06-15T14:19:03.971340639Z java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971372375Z at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:347)
2023-06-15T14:19:03.971386314Z at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
2023-06-15T14:19:03.971393094Z at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
2023-06-15T14:19:03.971398732Z at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374)
2023-06-15T14:19:03.971404374Z at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218)
2023-06-15T14:19:03.971409965Z at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
2023-06-15T14:19:03.971414840Z at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
2023-06-15T14:19:03.971419642Z at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
2023-06-15T14:19:03.971424466Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-06-15T14:19:03.971429693Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-06-15T14:19:03.971434557Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-06-15T14:19:03.971439081Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-06-15T14:19:03.971443774Z at java.base/java.lang.Thread.run(Thread.java:829)
2023-06-15T14:19:03.971448940Z Caused by: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971454003Z at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588)
2023-06-15T14:19:03.971477053Z at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:547)
2023-06-15T14:19:03.971482768Z at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:113)
2023-06-15T14:19:03.971490433Z at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:66)
2023-06-15T14:19:03.971495828Z at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:345)
2023-06-15T14:19:03.971500928Z ... 12 more
2023-06-15T14:19:03.971505860Z Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971511459Z at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
2023-06-15T14:19:03.971516728Z at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
2023-06-15T14:19:03.971557435Z at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
2023-06-15T14:19:03.971571292Z at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
2023-06-15T14:19:03.971576787Z at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
2023-06-15T14:19:03.971581325Z at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
2023-06-15T14:19:03.971648068Z at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
2023-06-15T14:19:03.971654268Z at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
2023-06-15T14:19:03.971659263Z at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
2023-06-15T14:19:03.971663692Z at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
2023-06-15T14:19:03.971668328Z at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574)
2023-06-15T14:19:03.971687253Z at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
2023-06-15T14:19:03.971693772Z at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
2023-06-15T14:19:03.971696819Z at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
2023-06-15T14:19:03.971700104Z at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
2023-06-15T14:19:03.971704966Z at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
2023-06-15T14:19:03.971710298Z at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
2023-06-15T14:19:03.971715273Z at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
2023-06-15T14:19:03.971732361Z at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
2023-06-15T14:19:03.971740825Z at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
2023-06-15T14:19:03.971744216Z at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
2023-06-15T14:19:03.971747150Z at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
2023-06-15T14:19:03.971750185Z at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
2023-06-15T14:19:03.971753106Z at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
2023-06-15T14:19:03.971756012Z at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
2023-06-15T14:19:03.971758846Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-06-15T14:19:03.971761676Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-06-15T14:19:03.971764526Z at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
2023-06-15T14:19:03.971767486Z ... 3 more
2023-06-15T14:19:03.971770336Z Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
2023-06-15T14:19:03.971773264Z at io.grpc.Status.asRuntimeException(Status.java:539)
2023-06-15T14:19:03.971776331Z ... 20 more
2023-06-15T14:19:03.971780978Z
What seems to be the issue why am getting the error message?
Metadata
Metadata
Assignees
Labels
No labels