Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Schema conflict happens when async clustering in flink job. The plan contains a parquet file written by spark. #12561

Open
JoyJoyJo opened this issue Dec 31, 2024 · 12 comments
Labels
flink Issues related to flink table-service

Comments

@JoyJoyJo
Copy link

JoyJoyJo commented Dec 31, 2024

Describe the problem you faced

I'm using a flink job to append data into a cow table. When async clustering was triggered, the job throw a exception as below:

[ERROR] 2025-01-01 01:04:46,869 method:org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:140)

Executor executes action [Execute clustering for instant 20250101010426430 from task 10] error
org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
at org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
at org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:264)
at org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:192)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/hudi/hudi_cow_sink3/1/6aed4a45-4272-4c3f-823d-a8dd34c1817c-0_0-5-330_20250101005816345.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 15 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: required int32 id != optional int32 id
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
... 18 more

Previously, I have stopped the flink job and executed clustering once by Spark run_clustering Procedure. And then I restart the Flink job. The latest clustering plan generated by Flink job contains some file written by spark.

To Reproduce

Steps to reproduce the behavior (locally):

  1. start a flink job to append some data into cow table (turn off clustering.async.enabled)
  2. stop flink job and execute clustering by Spark run_clustering procedure
  3. restart the flink job (turn on clustering.async.enabled and clustering.schedule.enabled)
    In step3, the clustering plan generated by flink job should contains some files written by Step2.

Expected behavior

Is it a bug? Any advice can help me to solve this conflict?

Environment Description

  • Hudi version : 0.13.1

  • Spark version : 2.4/3.4

  • Flink version: 1.16

Additional context

The field's repetiton properties of primary key id in parquet schema written by Flink is REQUIRED but OPTIONAL by Spark.

parquet schema written by flink:

"schema" : {
"name" : "flink_schema",
"repetition" : "REPEATED",
"logicalTypeAnnotation" : null,
"id" : null,
"fields" : [ {
"name" : "_hoodie_commit_time",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_commit_seqno",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_record_key",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_partition_path",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_file_name",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "id",
"repetition" : "REQUIRED",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT32",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "name",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "dt",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT32",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "ts",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT64",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}

parquet schema written by spark:

"schema" : {
"name" : "spark_schema",
"repetition" : "REPEATED",
"logicalTypeAnnotation" : null,
"id" : null,
"fields" : [ {
"name" : "_hoodie_commit_time",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_commit_seqno",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_record_key",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_partition_path",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_file_name",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "id",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT32",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "name",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "dt",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT32",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "ts",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT64",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}

Stacktrace

Add the stacktrace of the error.

@JoyJoyJo
Copy link
Author

related to #8685 #8587

@JoyJoyJo
Copy link
Author

@danny0405 PTAL

@danny0405
Copy link
Contributor

It looks like the field "id" has a change in nullability, in Flink schema it is not nullable, but in spark it is nullable. By default all the Flink fields should be nullable, did you have specific declaration in the table creation schema?

@JoyJoyJo
Copy link
Author

JoyJoyJo commented Jan 1, 2025

Thanks for your reply.

Locally, I added a primary key not enforced constraint to the field "id" and the Flink job failed because the schema conflict . I have tried to remove this constrain and the flink job can execute async clustering normally.

In production environment, we use hms as hudi catalog. The fields in hoodie.datasource.write.recordkey.field option will be set as primary keys automatically when table creation. These record key fields are all not nullable in schema.

It seems like we can not avoid schema conflict between flink and spark when hoodie.datasource.write.recordkey.field is set.

@danny0405
Copy link
Contributor

For append mode of Flink, there is no need to specify the record key fields, but for upsert opeartion, the record key field may have nullability discrepencies between Flink and Spark.

@ad1happy2go ad1happy2go added flink Issues related to flink table-service labels Jan 2, 2025
@ad1happy2go ad1happy2go moved this to 🏁 Triaged in Hudi Issue Support Jan 2, 2025
@JoyJoyJo
Copy link
Author

JoyJoyJo commented Jan 4, 2025

I think I found the main cause. When Spark executed clustering using row writer (hoodie.datasource.write.row.writer.enable = true), it read records through HadoopFSRelation and passed a nullable schema. The code is shown as below:

// Spark-sql_2.12-3.5.3
// DataSource(line: 414)

HadoopFsRelation(
        fileCatalog,
        partitionSchema = partitionSchema,
        dataSchema = dataSchema.asNullable,
        bucketSpec = bucketSpec,
        format,
        caseInsensitiveOptions)(sparkSession)

And Spark save these records into parquet file using the same schema.

// Hudi: master
// HoodieDatasetBulkInsertHelper(lines: 150)

def bulkInsert(dataset: Dataset[Row],
                 instantTime: String,
                 table: HoodieTable[_, _, _, _],
                 writeConfig: HoodieWriteConfig,
                 arePartitionRecordsSorted: Boolean,
                 shouldPreserveHoodieMetadata: Boolean): HoodieData[WriteStatus] = {
    val schema = dataset.schema
    HoodieJavaRDD.of(
      injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
        val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier
        val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
        val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
        val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
        ...
  }

The row writer changed the schema when read records which lead to the schema confilct later.

@JoyJoyJo
Copy link
Author

JoyJoyJo commented Jan 4, 2025

BTW, I have encouterred a failed case before when I used spark to backtrack some history partition data into a cow table.

The table was appended by flink originally and I did not specify the record key. By default, _hoodie_record_key metadata field would filled with a placeholder __empty__ in flink job. However, Spark can not generate _hoodie_record_key metadata field without primary key or record key.

If the table is not only be appended by flink, but also needs spark or other engine to process, I think the record key is required.

@danny0405
Copy link
Contributor

The Spark pkless table is supported since 0.14.x release, as for the schema nullability issue, can you fire a fix for it?

@cshuo
Copy link
Contributor

cshuo commented Jan 7, 2025

@danny0405 The problem may not only exists within spark cluster, for spark writing, e.g, insert into pk_table, the underlying nullability of pk field is also nullable. I'm wondering whether we should make a schema reconciliation during flink reading a pk table, if the scenario that a pk table contains files generated from spark writing and flink writing is common.

@danny0405
Copy link
Contributor

danny0405 commented Jan 7, 2025

I'm wondering whether we should make a schema reconciliation during flink reading a pk table

I think we could. It looks like a more easier fix.

@cshuo
Copy link
Contributor

cshuo commented Jan 7, 2025

I'm wondering whether we should make a schema reconciliation during flink reading a pk table

I think we could. It looks like a more easier fix.

Ok, I can take a look at the fixing.

@cshuo
Copy link
Contributor

cshuo commented Jan 8, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink table-service
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

4 participants