You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm encountering an issue with the Kafka Connect Sink Connector, specifically when integrating with Apache Iceberg. The error is as follows:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Cannot add field _id as an identifier field: not a primitive type field at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220) at org.apache.iceberg.Schema.validateIdentifierField(Schema.java:122) at org.apache.iceberg.Schema.lambda$new$0(Schema.java:106) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.iceberg.Schema.<init>(Schema.java:106) at org.apache.iceberg.Schema.<init>(Schema.java:91) at org.apache.iceberg.Schema.<init>(Schema.java:83) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:347) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent.icebergSchema(IcebergChangeEvent.java:64) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.lambda$loadIcebergTable$0(IcebergChangeConsumer.java:68) at java.base/java.util.Optional.orElseGet(Optional.java:369) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.loadIcebergTable(IcebergChangeConsumer.java:64) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:55) at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) ... 11 more
The complete error log is attached below. This exception suggests a problem with adding the _id field as an identifier in the Iceberg schema because it's not a primitive type.
I am using a specific JSON configuration for the connector (which I will attach below). Could you please help me understand why this error occurs? It seems that the _id field is not recognized as a valid identifier, although there doesn't seem to be anything unusual with its definition.
Attached: [Error Log and JSON Configuration] { "name": "mongodb-iceberg-sink-getindata", "config": { "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink", "topics": "test.mongodemo.tripNotification", "upsert": "true", "upsert.keep-deletes": "true", "table.auto-create": "true", "allow-field-addition": "true", "table.write-format": "parquet", "table.namespace": "jskim", "table.prefix": "debumcdc_", "iceberg.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.warehouse": "s3a://jskim/sample-test", "iceberg.fs.defaultFS": "s3a://jskim/sample-test", "iceberg.com.amazonaws.services.s3.enableV4": "true", "iceberg.com.amazonaws.services.s3a.enableV4": "true", "iceberg.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "iceberg.fs.s3a.path.style.access": "true", "iceberg.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "iceberg.fs.s3a.access.key": "", "iceberg.fs.s3a.secret.key": "+l/YxMb2pxJQ" } }
Any insights or suggestions on resolving this issue would be greatly appreciated.
Thank you.
The text was updated successfully, but these errors were encountered:
Hello,
I'm encountering an issue with the Kafka Connect Sink Connector, specifically when integrating with Apache Iceberg. The error is as follows:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Cannot add field _id as an identifier field: not a primitive type field at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220) at org.apache.iceberg.Schema.validateIdentifierField(Schema.java:122) at org.apache.iceberg.Schema.lambda$new$0(Schema.java:106) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.iceberg.Schema.<init>(Schema.java:106) at org.apache.iceberg.Schema.<init>(Schema.java:91) at org.apache.iceberg.Schema.<init>(Schema.java:83) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:347) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent.icebergSchema(IcebergChangeEvent.java:64) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.lambda$loadIcebergTable$0(IcebergChangeConsumer.java:68) at java.base/java.util.Optional.orElseGet(Optional.java:369) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.loadIcebergTable(IcebergChangeConsumer.java:64) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:55) at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) ... 11 more
The complete error log is attached below. This exception suggests a problem with adding the _id field as an identifier in the Iceberg schema because it's not a primitive type.
I am using a specific JSON configuration for the connector (which I will attach below). Could you please help me understand why this error occurs? It seems that the _id field is not recognized as a valid identifier, although there doesn't seem to be anything unusual with its definition.
Attached: [Error Log and JSON Configuration]
{ "name": "mongodb-iceberg-sink-getindata", "config": { "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink", "topics": "test.mongodemo.tripNotification", "upsert": "true", "upsert.keep-deletes": "true", "table.auto-create": "true", "allow-field-addition": "true", "table.write-format": "parquet", "table.namespace": "jskim", "table.prefix": "debumcdc_", "iceberg.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.warehouse": "s3a://jskim/sample-test", "iceberg.fs.defaultFS": "s3a://jskim/sample-test", "iceberg.com.amazonaws.services.s3.enableV4": "true", "iceberg.com.amazonaws.services.s3a.enableV4": "true", "iceberg.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "iceberg.fs.s3a.path.style.access": "true", "iceberg.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "iceberg.fs.s3a.access.key": "", "iceberg.fs.s3a.secret.key": "+l/YxMb2pxJQ" } }
Any insights or suggestions on resolving this issue would be greatly appreciated.
Thank you.
The text was updated successfully, but these errors were encountered: