-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SUPPORT] Schema conflict happens when async clustering in flink job. The plan contains a parquet file written by spark. #12561
Comments
@danny0405 PTAL |
It looks like the field "id" has a change in nullability, in Flink schema it is not nullable, but in spark it is nullable. By default all the Flink fields should be nullable, did you have specific declaration in the table creation schema? |
Thanks for your reply. Locally, I added a In production environment, we use hms as hudi catalog. The fields in It seems like we can not avoid schema conflict between flink and spark when |
For append mode of Flink, there is no need to specify the record key fields, but for upsert opeartion, the record key field may have nullability discrepencies between Flink and Spark. |
I think I found the main cause. When Spark executed clustering using row writer ( // Spark-sql_2.12-3.5.3
// DataSource(line: 414)
HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession) And Spark save these records into parquet file using the same schema. // Hudi: master
// HoodieDatasetBulkInsertHelper(lines: 150)
def bulkInsert(dataset: Dataset[Row],
instantTime: String,
table: HoodieTable[_, _, _, _],
writeConfig: HoodieWriteConfig,
arePartitionRecordsSorted: Boolean,
shouldPreserveHoodieMetadata: Boolean): HoodieData[WriteStatus] = {
val schema = dataset.schema
HoodieJavaRDD.of(
injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier
val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
...
} The row writer changed the schema when read records which lead to the schema confilct later. |
BTW, I have encouterred a failed case before when I used spark to backtrack some history partition data into a cow table. The table was appended by flink originally and I did not specify the record key. By default, If the table is not only be appended by flink, but also needs spark or other engine to process, I think the record key is required. |
The Spark pkless table is supported since 0.14.x release, as for the schema nullability issue, can you fire a fix for it? |
@danny0405 The problem may not only exists within spark cluster, for spark writing, e.g, insert into pk_table, the underlying nullability of pk field is also nullable. I'm wondering whether we should make a schema reconciliation during flink reading a pk table, if the scenario that a pk table contains files generated from spark writing and flink writing is common. |
I think we could. It looks like a more easier fix. |
Ok, I can take a look at the fixing. |
jira created: https://issues.apache.org/jira/browse/HUDI-8841 cc @danny0405 |
Describe the problem you faced
I'm using a flink job to append data into a cow table. When async clustering was triggered, the job throw a exception as below:
Previously, I have stopped the flink job and executed clustering once by Spark
run_clustering
Procedure. And then I restart the Flink job. The latest clustering plan generated by Flink job contains some file written by spark.To Reproduce
Steps to reproduce the behavior (locally):
run_clustering
procedureIn step3, the clustering plan generated by flink job should contains some files written by Step2.
Expected behavior
Is it a bug? Any advice can help me to solve this conflict?
Environment Description
Hudi version : 0.13.1
Spark version : 2.4/3.4
Flink version: 1.16
Additional context
The field's repetiton properties of primary key
id
in parquet schema written by Flink isREQUIRED
butOPTIONAL
by Spark.parquet schema written by flink:
parquet schema written by spark:
Stacktrace
Add the stacktrace of the error.
The text was updated successfully, but these errors were encountered: