Skip to content

Commit

Permalink
[HUDI-8880] Fix hive scan exception after new column is added (#12659)
Browse files Browse the repository at this point in the history
  • Loading branch information
cshuo authored Jan 21, 2025
1 parent 696683f commit 9969b44
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.hudi.hadoop;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;

Expand Down Expand Up @@ -123,7 +125,7 @@ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSpli
if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) {
return new HoodieFileGroupReaderBasedRecordReader((s, j) -> {
try {
return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(), s, j, reporter);
return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(Option.empty()), s, j, reporter);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -152,21 +154,29 @@ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSpli
}

// adapt schema evolution
new SchemaEvolutionContext(split, job).doEvolutionForParquetFormat();
SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, job);
schemaEvolutionContext.doEvolutionForParquetFormat();

if (LOG.isDebugEnabled()) {
LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split);
}

return getRecordReaderInternal(split, job, reporter);
return getRecordReaderInternal(split, job, reporter, schemaEvolutionContext.internalSchemaOption);
}

private RecordReader<NullWritable, ArrayWritable> getRecordReaderInternal(InputSplit split,
JobConf job,
Reporter reporter) throws IOException {
return getRecordReaderInternal(split, job, reporter, Option.empty());
}

private RecordReader<NullWritable, ArrayWritable> getRecordReaderInternal(InputSplit split,
JobConf job,
Reporter reporter,
Option<InternalSchema> internalSchemaOption) throws IOException {
try {
if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) {
return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(), split, job, reporter);
return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(internalSchemaOption), split, job, reporter);
} else {
return super.getRecordReader(split, job, reporter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.hadoop.avro;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.HoodieColumnProjectionUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;

Expand All @@ -30,6 +31,9 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
Expand All @@ -50,13 +54,23 @@ public class HoodieAvroParquetReader extends RecordReader<Void, ArrayWritable> {
private final ParquetRecordReader<GenericData.Record> parquetRecordReader;
private Schema baseSchema;

public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) throws IOException {
public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf, Option<InternalSchema> internalSchemaOption) throws IOException {
// get base schema
ParquetMetadata fileFooter =
ParquetFileReader.readFooter(conf, ((ParquetInputSplit) inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER);
MessageType messageType = fileFooter.getFileMetaData().getSchema();
baseSchema = new AvroSchemaConverter(conf).convert(messageType);

if (internalSchemaOption.isPresent()) {
// do schema reconciliation in case there exists read column which is not in the file schema.
InternalSchema mergedInternalSchema = new InternalSchemaMerger(
AvroInternalSchemaConverter.convert(baseSchema),
internalSchemaOption.get(),
true,
true).mergeSchema();
baseSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, baseSchema.getFullName());
}

// if exists read columns, we need to filter columns.
List<String> readColNames = Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(conf));
if (!readColNames.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.util.ContextUtil;

Expand All @@ -33,12 +35,18 @@
* we need to handle timestamp types separately based on the parquet-avro approach.
*/
public class HoodieTimestampAwareParquetInputFormat extends ParquetInputFormat<ArrayWritable> {
private final Option<InternalSchema> internalSchemaOption;

public HoodieTimestampAwareParquetInputFormat(Option<InternalSchema> internalSchemaOption) {
super();
this.internalSchemaOption = internalSchemaOption;
}

@Override
public RecordReader<Void, ArrayWritable> createRecordReader(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException {
Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
return new HoodieAvroParquetReader(inputSplit, conf);
return new HoodieAvroParquetReader(inputSplit, conf, internalSchemaOption);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -55,6 +56,8 @@

import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Tag("functional")
Expand Down Expand Up @@ -198,13 +201,64 @@ public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception
recordReader.close();
}

@ParameterizedTest
@ValueSource(strings = {"mor","cow"})
public void testHiveReadSchemaEvolutionWithAddingColumns(String tableType) throws Exception {
String tableName = "hudi_test" + new Date().getTime();
String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();

spark.sql("set hoodie.schema.on.read.enable=true");

spark.sql(String.format("create table %s (col0 int, col1 float, col2 string, col3 timestamp) using hudi "
+ "tblproperties (type='%s', primaryKey='col0', preCombineField='col1', "
+ "hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload') location '%s'",
tableName, tableType, path));
spark.sql(String.format("insert into %s values(1, 1.1, 'text', timestamp('2021-12-25 12:01:01'))", tableName));
spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName));
spark.sql(String.format("alter table %s add columns (col4 string)", tableName));

JobConf jobConf = new JobConf();
jobConf.set(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,col2,col3,col4");
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7,8,9");
jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"
+ "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2,col3,col4");
jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,float,string,timestamp,string,string");
FileInputFormat.setInputPaths(jobConf, path);

HoodieParquetInputFormat inputFormat =
tableType.equals("cow") ? new HoodieParquetInputFormat() : new HoodieParquetRealtimeInputFormat();
inputFormat.setConf(jobConf);

InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
int expectedSplits = 1;
assertEquals(expectedSplits, splits.length);

RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(splits[0], jobConf, null);
List<List<Writable>> records = getWritableList(recordReader, false);
assertEquals(1, records.size());
List<Writable> record1 = records.get(0);
assertEquals(10, record1.size());
assertEquals(new FloatWritable(1.1f), record1.get(6));
assertEquals(new Text("text2"), record1.get(7));
assertInstanceOf(TimestampWritable.class, record1.get(8));
// field-9 is new added column without any inserts.
assertNull(record1.get(9));
recordReader.close();
}

private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException {
return getWritableList(recordReader, true);
}

private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader, boolean filterNull) throws IOException {
List<List<Writable>> records = new ArrayList<>();
NullWritable key = recordReader.createKey();
ArrayWritable writable = recordReader.createValue();
while (writable != null && recordReader.next(key, writable)) {
records.add(Arrays.stream(writable.get())
.filter(Objects::nonNull)
.filter(f -> !filterNull || Objects.nonNull(f))
.collect(Collectors.toList()));
}
return records;
Expand Down

0 comments on commit 9969b44

Please sign in to comment.