Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with WorkerSinkTask in Kafka Connect: Exception with Identifier Field in Iceberg Schema #49

Open
junsik-gsitm opened this issue Dec 18, 2023 · 1 comment

Comments

@junsik-gsitm
Copy link

Hello,

I'm encountering an issue with the Kafka Connect Sink Connector, specifically when integrating with Apache Iceberg. The error is as follows:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Cannot add field _id as an identifier field: not a primitive type field at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220) at org.apache.iceberg.Schema.validateIdentifierField(Schema.java:122) at org.apache.iceberg.Schema.lambda$new$0(Schema.java:106) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.iceberg.Schema.<init>(Schema.java:106) at org.apache.iceberg.Schema.<init>(Schema.java:91) at org.apache.iceberg.Schema.<init>(Schema.java:83) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:347) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent.icebergSchema(IcebergChangeEvent.java:64) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.lambda$loadIcebergTable$0(IcebergChangeConsumer.java:68) at java.base/java.util.Optional.orElseGet(Optional.java:369) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.loadIcebergTable(IcebergChangeConsumer.java:64) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:55) at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) ... 11 more

The complete error log is attached below. This exception suggests a problem with adding the _id field as an identifier in the Iceberg schema because it's not a primitive type.

I am using a specific JSON configuration for the connector (which I will attach below). Could you please help me understand why this error occurs? It seems that the _id field is not recognized as a valid identifier, although there doesn't seem to be anything unusual with its definition.

Attached: [Error Log and JSON Configuration]
{ "name": "mongodb-iceberg-sink-getindata", "config": { "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink", "topics": "test.mongodemo.tripNotification", "upsert": "true", "upsert.keep-deletes": "true", "table.auto-create": "true", "allow-field-addition": "true", "table.write-format": "parquet", "table.namespace": "jskim", "table.prefix": "debumcdc_", "iceberg.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.warehouse": "s3a://jskim/sample-test", "iceberg.fs.defaultFS": "s3a://jskim/sample-test", "iceberg.com.amazonaws.services.s3.enableV4": "true", "iceberg.com.amazonaws.services.s3a.enableV4": "true", "iceberg.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "iceberg.fs.s3a.path.style.access": "true", "iceberg.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "iceberg.fs.s3a.access.key": "", "iceberg.fs.s3a.secret.key": "+l/YxMb2pxJQ" } }
Any insights or suggestions on resolving this issue would be greatly appreciated.

Thank you.

@utkanbir
Copy link

Hi Junsik , i have a similiar problem. What are your iceberg settings?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants