Skip to content

Commit

Permalink
DRILL-8492: Read Parquet Microsecond Columns as Bigint (#2907)
Browse files Browse the repository at this point in the history
  • Loading branch information
handmadecode authored Jul 31, 2024
1 parent 9403a5e commit 1b7569d
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ private ExecConstants() {
public static final String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp";
public static final OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP,
new OptionDescription("Enables Drill to implicitly interpret the INT96 timestamp data type in Parquet files."));
public static final String PARQUET_READER_TIME_MICROS_AS_INT64 = "store.parquet.reader.time_micros_as_int64";
public static final OptionValidator PARQUET_READER_TIME_MICROS_AS_INT64_VALIDATOR = new BooleanValidator(PARQUET_READER_TIME_MICROS_AS_INT64,
new OptionDescription("Enables Drill to implicitly interpret the TIME_MICROS data type in Parquet files as 64-bit integers instead of SQL times truncated to milliseconds."));
public static final String PARQUET_READER_TIMESTAMP_MICROS_AS_INT64 = "store.parquet.reader.timestamp_micros_as_int64";
public static final OptionValidator PARQUET_READER_TIMESTAMP_MICROS_AS_INT64_VALIDATOR = new BooleanValidator(PARQUET_READER_TIMESTAMP_MICROS_AS_INT64,
new OptionDescription("Enables Drill to implicitly interpret the TIMESTAMP_MICROS data type in Parquet files as 64-bit integers instead of SQL timestamps truncated to milliseconds."));

public static final String PARQUET_READER_STRINGS_SIGNED_MIN_MAX = "store.parquet.reader.strings_signed_min_max";
public static final StringValidator PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR = new EnumeratedStringValidator(PARQUET_READER_STRINGS_SIGNED_MIN_MAX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class ParquetReaderConfig {
private boolean enableTimeReadCounter = false;
private boolean autoCorrectCorruptedDates = true;
private boolean enableStringsSignedMinMax = false;
private boolean readTimeMicrosAsInt64 = false;
private boolean readTimestampMicrosAsInt64 = false;

public static ParquetReaderConfig.Builder builder() {
return new ParquetReaderConfig.Builder();
Expand Down Expand Up @@ -100,6 +102,16 @@ public boolean enableStringsSignedMinMax() {
return enableStringsSignedMinMax;
}

@JsonProperty("readTimeMicrosAsInt64")
public boolean readTimeMicrosAsInt64() {
return readTimeMicrosAsInt64;
}

@JsonProperty("readTimestampMicrosAsInt64")
public boolean readTimestampMicrosAsInt64() {
return readTimestampMicrosAsInt64;
}

public ParquetReadOptions toReadOptions() {
return ParquetReadOptions.builder()
.useSignedStringMinMax(enableStringsSignedMinMax)
Expand All @@ -120,7 +132,9 @@ public int hashCode() {
enableBytesTotalCounter,
enableTimeReadCounter,
autoCorrectCorruptedDates,
enableStringsSignedMinMax);
enableStringsSignedMinMax,
readTimeMicrosAsInt64,
readTimestampMicrosAsInt64);
}

@Override
Expand All @@ -136,7 +150,9 @@ public boolean equals(Object o) {
&& enableBytesTotalCounter == that.enableBytesTotalCounter
&& enableTimeReadCounter == that.enableTimeReadCounter
&& autoCorrectCorruptedDates == that.autoCorrectCorruptedDates
&& enableStringsSignedMinMax == that.enableStringsSignedMinMax;
&& enableStringsSignedMinMax == that.enableStringsSignedMinMax
&& readTimeMicrosAsInt64 == that.readTimeMicrosAsInt64
&& readTimestampMicrosAsInt64 == that.readTimestampMicrosAsInt64;
}

@Override
Expand All @@ -147,6 +163,8 @@ public String toString() {
+ ", enableTimeReadCounter=" + enableTimeReadCounter
+ ", autoCorrectCorruptedDates=" + autoCorrectCorruptedDates
+ ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+ ", readTimeMicrosAsInt64=" + readTimeMicrosAsInt64
+ ", readTimestampMicrosAsInt64=" + readTimestampMicrosAsInt64
+ '}';
}

Expand Down Expand Up @@ -195,6 +213,10 @@ public ParquetReaderConfig build() {
if (optVal != null && !optVal.isEmpty()) {
readerConfig.enableStringsSignedMinMax = Boolean.valueOf(optVal);
}

// The read*MicrosAsInt64 config values are set from any option scope.
readerConfig.readTimeMicrosAsInt64 = options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64);
readerConfig.readTimestampMicrosAsInt64 = options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64);
}

return readerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,21 @@ private static ColumnReader<? extends ValueVector> getColumnReader(ParquetRecord
return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
case TIMESTAMP_MICROS:
return new ParquetFixedWidthDictionaryReaders.DictionaryTimeStampMicrosReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
if (recordReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
} else {
return new ParquetFixedWidthDictionaryReaders.DictionaryTimeStampMicrosReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
}
case TIME_MICROS:
return new ParquetFixedWidthDictionaryReaders.DictionaryTimeMicrosReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
if (recordReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
} else {
return new ParquetFixedWidthDictionaryReaders.DictionaryTimeMicrosReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
}
case UINT_64:
return new ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
Expand Down Expand Up @@ -299,10 +309,21 @@ public static ColumnReader<?> getNullableColumnReader(ParquetRecordReader parent
case TIMESTAMP_MILLIS:
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector)valueVec, schemaElement);
case TIME_MICROS:
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeMicrosReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector) valueVec, schemaElement);
if (parentReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector) valueVec, schemaElement);
} else {
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeMicrosReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector) valueVec, schemaElement);
}
case TIMESTAMP_MICROS:
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampMicrosReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector) valueVec, schemaElement);
if (parentReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector) valueVec, schemaElement);
} else {
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampMicrosReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector) valueVec, schemaElement);
}
case INT_64:
return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector) valueVec, schemaElement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,19 @@ private static TypeProtos.MinorType getMinorType(PrimitiveType.PrimitiveTypeName
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return TypeProtos.MinorType.VARDECIMAL;
case TIMESTAMP_MILLIS:
case TIMESTAMP_MICROS:
return TypeProtos.MinorType.TIMESTAMP;
case TIMESTAMP_MICROS:
if (options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
return TypeProtos.MinorType.BIGINT;
} else {
return TypeProtos.MinorType.TIMESTAMP;
}
case TIME_MICROS:
return TypeProtos.MinorType.TIME;
if (options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
return TypeProtos.MinorType.BIGINT;
} else {
return TypeProtos.MinorType.TIME;
}
default:
throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class FileMetadataCollector {
private final FileSystem fs;
private final boolean allColumnsInteresting;
private final boolean skipNonInteresting;
private final boolean truncateTimeMicros;
private final boolean truncateTimestampMicros;
private final Set<SchemaPath> columnSet;

private final MessageType schema;
Expand Down Expand Up @@ -90,6 +92,9 @@ public FileMetadataCollector(ParquetMetadata metadata,
readerConfig.autoCorrectCorruptedDates());
logger.debug("Contains corrupt dates: {}.", containsCorruptDates);

this.truncateTimeMicros = !readerConfig.readTimeMicrosAsInt64();
this.truncateTimestampMicros = !readerConfig.readTimestampMicrosAsInt64();

this.colTypeInfoMap = new HashMap<>();
for (String[] path : schema.getPaths()) {
colTypeInfoMap.put(SchemaPath.getCompoundPath(path), ColTypeInfo.of(schema, schema, path, 0, new ArrayList<>()));
Expand Down Expand Up @@ -208,7 +213,7 @@ private void addColumnMetadata(String[] columnName,
minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue);
maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue);
}
if (isMicrosecondColumnType(columnTypeMetadata.originalType)) {
if (shouldTruncateMicros(columnTypeMetadata)) {
// DRILL-8241: truncate the min/max of microsecond columns to milliseconds, otherwise the
// initial scanning of files when filtering will compare to the wrong values.
minValue = truncateMicros(minValue);
Expand All @@ -224,8 +229,10 @@ private void addColumnMetadata(String[] columnName,
columnTypeInfo.put(columnTypeMetadataKey, columnTypeMetadata);
}

private static boolean isMicrosecondColumnType(OriginalType columnType) {
return columnType == OriginalType.TIME_MICROS || columnType == OriginalType.TIMESTAMP_MICROS;
private boolean shouldTruncateMicros(Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata) {
return (truncateTimeMicros && columnTypeMetadata.originalType == OriginalType.TIME_MICROS)
||
(truncateTimestampMicros && columnTypeMetadata.originalType == OriginalType.TIMESTAMP_MICROS);
}

private static Object truncateMicros(Object microSeconds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,22 @@ protected PrimitiveConverter getConverterForType(String name, PrimitiveType type
case INT_64:
return getBigIntConverter(name, type);
case TIMESTAMP_MICROS: {
TimeStampWriter writer = getTimeStampWriter(name, type);
return new DrillTimeStampMicrosConverter(writer);
if (options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
return getBigIntConverter(name, type);
} else {
TimeStampWriter writer = getTimeStampWriter(name, type);
return new DrillTimeStampMicrosConverter(writer);
}
}
case TIME_MICROS: {
TimeWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).time(), l -> l.list().time())
: getWriter(name, MapWriter::time, ListWriter::time);
return new DrillTimeMicrosConverter(writer);
if (options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
return getBigIntConverter(name, type);
} else {
TimeWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).time(), l -> l.list().time())
: getWriter(name, MapWriter::time, ListWriter::time);
return new DrillTimeMicrosConverter(writer);
}
}
case DECIMAL: {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
Expand Down
2 changes: 2 additions & 0 deletions exec/java-exec/src/main/resources/drill-module.conf
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,8 @@ drill.exec.options: {
store.parquet.reader.columnreader.async: false,
store.parquet.reader.enable_map_support: true,
store.parquet.reader.int96_as_timestamp: false,
store.parquet.reader.time_micros_as_int64: false,
store.parquet.reader.timestamp_micros_as_int64: false,
store.parquet.reader.pagereader.async: true,
store.parquet.reader.pagereader.bufferedread: true,
store.parquet.reader.pagereader.buffersize: 1048576,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,19 @@ public enum EnumType {
" required int32 rowKey; \n" +
" repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" +
"} \n";
public static final String microsecondColumnsSchemaMsg =
"message ParquetMicrosecondDataTypes { \n" +
" required int32 rowKey; \n" +
" required int64 _TIME_MICROS_int64 ( TIME_MICROS ) ; \n" +
" required int64 _TIMESTAMP_MICROS_int64 ( TIMESTAMP_MICROS ) ; \n" +
"} \n";

public static MessageType simpleSchema = MessageTypeParser.parseMessageType(simpleSchemaMsg);
public static MessageType complexSchema = MessageTypeParser.parseMessageType(complexSchemaMsg);
public static MessageType simpleNullableSchema = MessageTypeParser.parseMessageType(simpleNullableSchemaMsg);
public static MessageType complexNullableSchema = MessageTypeParser.parseMessageType(complexNullableSchemaMsg);
public static MessageType repeatedIntSchema = MessageTypeParser.parseMessageType(repeatedIntSchemaMsg);
public static MessageType microsecondColumnsSchema = MessageTypeParser.parseMessageType(microsecondColumnsSchemaMsg);


public static Path initFile(String fileName) {
Expand Down Expand Up @@ -488,13 +495,32 @@ public static void writeRepeatedIntValues(
}
}

public static void writeMicrosecondValues(
SimpleGroupFactory groupFactory,
ParquetWriter<Group> writer,
long[] timeMicrosValues,
long[] timestampMicrosValues) throws IOException {

int numValues = Math.min(timeMicrosValues.length, timestampMicrosValues.length);
for (int i = 0; i < numValues; i++) {

writer.write(
groupFactory.newGroup()
.append("rowKey", i + 1)
.append("_TIME_MICROS_int64", timeMicrosValues[i])
.append("_TIMESTAMP_MICROS_int64", timestampMicrosValues[i])
);
}
}

public static void main(String[] args) throws IOException {

SimpleGroupFactory sgf = new SimpleGroupFactory(simpleSchema);
GroupFactory gf = new SimpleGroupFactory(complexSchema);
SimpleGroupFactory sngf = new SimpleGroupFactory(simpleNullableSchema);
GroupFactory ngf = new SimpleGroupFactory(complexNullableSchema);
SimpleGroupFactory repeatedIntGroupFactory = new SimpleGroupFactory(repeatedIntSchema);
SimpleGroupFactory microsecondGroupFactory = new SimpleGroupFactory(microsecondColumnsSchema);

// Generate files with dictionary encoding enabled and disabled
ParquetWriter<Group> simpleWriter = initWriter(simpleSchema, "drill/parquet_test_file_simple", true);
Expand All @@ -506,6 +532,8 @@ public static void main(String[] args) throws IOException {
ParquetWriter<Group> simpleNullableNoDictWriter = initWriter(simpleNullableSchema, "drill/parquet_test_file_simple_nullable_nodict", false);
ParquetWriter<Group> complexNullableNoDictWriter = initWriter(complexNullableSchema, "drill/parquet_test_file_complex_nullable_nodict", false);
ParquetWriter<Group> repeatedIntV2Writer = initWriter(repeatedIntSchema, "drill/parquet_v2_repeated_int.parquet", ParquetProperties.WriterVersion.PARQUET_2_0, true);
ParquetWriter<Group> microsecondWriter = initWriter(microsecondColumnsSchema, "drill/microseconds.parquet", false);
ParquetWriter<Group> microsecondSmallDiffWriter = initWriter(microsecondColumnsSchema, "drill/microseconds_small_diff.parquet", false);

ParquetSimpleTestFileGenerator.writeSimpleValues(sgf, simpleWriter, false);
ParquetSimpleTestFileGenerator.writeSimpleValues(sngf, simpleNullableWriter, true);
Expand All @@ -516,6 +544,16 @@ public static void main(String[] args) throws IOException {
ParquetSimpleTestFileGenerator.writeComplexValues(gf, complexNoDictWriter, false);
ParquetSimpleTestFileGenerator.writeComplexValues(ngf, complexNullableNoDictWriter, true);
ParquetSimpleTestFileGenerator.writeRepeatedIntValues(repeatedIntGroupFactory, repeatedIntV2Writer, 100);
ParquetSimpleTestFileGenerator.writeMicrosecondValues(
microsecondGroupFactory,
microsecondWriter,
TestMicrosecondColumns.TIME_MICROS_VALUES,
TestMicrosecondColumns.TIMESTAMP_MICROS_VALUES);
ParquetSimpleTestFileGenerator.writeMicrosecondValues(
microsecondGroupFactory,
microsecondSmallDiffWriter,
TestMicrosecondColumns.TIME_MICROS_SMALL_DIFF_VALUES,
TestMicrosecondColumns.TIMESTAMP_MICROS_SMALL_DIFF_VALUES);

simpleWriter.close();
complexWriter.close();
Expand All @@ -526,7 +564,8 @@ public static void main(String[] args) throws IOException {
simpleNullableNoDictWriter.close();
complexNullableNoDictWriter.close();
repeatedIntV2Writer.close();

microsecondWriter.close();
microsecondSmallDiffWriter.close();
}

}
Loading

0 comments on commit 1b7569d

Please sign in to comment.