Skip to content

Commit 6cafe28

Browse files
authored
fix: handle loading of complex types into CometVector correctly in iceberg_compat scans (#1279)
1 parent 274566f commit 6cafe28

File tree

4 files changed

+42
-51
lines changed

4 files changed

+42
-51
lines changed

common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import org.apache.parquet.column.ColumnDescriptor;
26+
import org.apache.parquet.schema.Type;
2627
import org.apache.spark.sql.types.DataType;
2728
import org.apache.spark.sql.types.TimestampNTZType$;
2829

@@ -36,6 +37,9 @@ public abstract class AbstractColumnReader implements AutoCloseable {
3637
/** The Spark data type. */
3738
protected final DataType type;
3839

40+
/** The Spark data type. */
41+
protected final Type fieldType;
42+
3943
/** Parquet column descriptor. */
4044
protected final ColumnDescriptor descriptor;
4145

@@ -61,13 +65,23 @@ public abstract class AbstractColumnReader implements AutoCloseable {
6165

6266
public AbstractColumnReader(
6367
DataType type,
68+
Type fieldType,
6469
ColumnDescriptor descriptor,
6570
boolean useDecimal128,
6671
boolean useLegacyDateTimestamp) {
6772
this.type = type;
73+
this.fieldType = fieldType;
6874
this.descriptor = descriptor;
6975
this.useDecimal128 = useDecimal128;
7076
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
77+
}
78+
79+
public AbstractColumnReader(
80+
DataType type,
81+
ColumnDescriptor descriptor,
82+
boolean useDecimal128,
83+
boolean useLegacyDateTimestamp) {
84+
this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp);
7185
TypeUtil.checkParquetType(descriptor, type);
7286
}
7387

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.comet.shims.ShimBatchReader;
7373
import org.apache.comet.shims.ShimFileFormat;
7474
import org.apache.comet.vector.CometVector;
75+
import org.apache.comet.vector.NativeUtil;
7576

7677
/**
7778
* A vectorized Parquet reader that reads a Parquet file in a batched fashion.
@@ -94,6 +95,7 @@
9495
public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
9596
private static final Logger LOG = LoggerFactory.getLogger(NativeBatchReader.class);
9697
protected static final BufferAllocator ALLOCATOR = new RootAllocator();
98+
private NativeUtil nativeUtil = new NativeUtil();
9799

98100
private Configuration conf;
99101
private int capacity;
@@ -266,7 +268,8 @@ public void init() throws URISyntaxException, IOException {
266268

267269
//// Create Column readers
268270
List<ColumnDescriptor> columns = requestedSchema.getColumns();
269-
int numColumns = columns.size();
271+
List<Type> fields = requestedSchema.getFields();
272+
int numColumns = fields.size();
270273
if (partitionSchema != null) numColumns += partitionSchema.size();
271274
columnReaders = new AbstractColumnReader[numColumns];
272275

@@ -454,6 +457,7 @@ public void close() throws IOException {
454457
importer.close();
455458
importer = null;
456459
}
460+
nativeUtil.close();
457461
Native.closeRecordBatchReader(this.handle);
458462
}
459463

@@ -469,19 +473,23 @@ private int loadNextBatch() throws Throwable {
469473
importer = new CometSchemaImporter(ALLOCATOR);
470474

471475
List<ColumnDescriptor> columns = requestedSchema.getColumns();
472-
for (int i = 0; i < columns.size(); i++) {
476+
List<Type> fields = requestedSchema.getFields();
477+
for (int i = 0; i < fields.size(); i++) {
473478
// TODO: (ARROW NATIVE) check this. Currently not handling missing columns correctly?
474479
if (missingColumns[i]) continue;
475480
if (columnReaders[i] != null) columnReaders[i].close();
476481
// TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase
477482
DataType dataType = sparkSchema.fields()[i].dataType();
483+
Type field = fields.get(i);
478484
NativeColumnReader reader =
479485
new NativeColumnReader(
480486
this.handle,
481487
i,
482488
dataType,
483-
columns.get(i),
489+
field,
490+
null,
484491
importer,
492+
nativeUtil,
485493
capacity,
486494
useDecimal128,
487495
useLegacyDateTimestamp);

common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java

Lines changed: 16 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,8 @@
2727
import org.apache.arrow.c.CometSchemaImporter;
2828
import org.apache.arrow.memory.BufferAllocator;
2929
import org.apache.arrow.memory.RootAllocator;
30-
import org.apache.arrow.vector.FieldVector;
31-
import org.apache.arrow.vector.dictionary.Dictionary;
32-
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
3330
import org.apache.parquet.column.ColumnDescriptor;
34-
import org.apache.parquet.column.page.*;
35-
import org.apache.parquet.schema.LogicalTypeAnnotation;
31+
import org.apache.parquet.schema.Type;
3632
import org.apache.spark.sql.types.DataType;
3733

3834
import org.apache.comet.vector.*;
@@ -65,6 +61,7 @@ public class NativeColumnReader extends AbstractColumnReader {
6561
boolean hadNull;
6662

6763
private final CometSchemaImporter importer;
64+
private final NativeUtil nativeUtil;
6865

6966
private ArrowArray array = null;
7067
private ArrowSchema schema = null;
@@ -76,14 +73,17 @@ public NativeColumnReader(
7673
long nativeBatchHandle,
7774
int columnNum,
7875
DataType type,
76+
Type fieldType,
7977
ColumnDescriptor descriptor,
8078
CometSchemaImporter importer,
79+
NativeUtil nativeUtil,
8180
int batchSize,
8281
boolean useDecimal128,
8382
boolean useLegacyDateTimestamp) {
84-
super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
83+
super(type, fieldType, descriptor, useDecimal128, useLegacyDateTimestamp);
8584
assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
8685
this.batchSize = batchSize;
86+
this.nativeUtil = nativeUtil;
8787
this.importer = importer;
8888
this.nativeBatchHandle = nativeBatchHandle;
8989
this.columnNum = columnNum;
@@ -94,13 +94,13 @@ public NativeColumnReader(
9494
// Override in order to avoid creation of JVM side column readers
9595
protected void initNative() {
9696
LOG.debug(
97-
"Native column reader " + String.join(".", this.descriptor.getPath()) + " is initialized");
97+
"Native column reader {} is initialized", String.join(".", this.type.catalogString()));
9898
nativeHandle = 0;
9999
}
100100

101101
@Override
102102
public void readBatch(int total) {
103-
LOG.debug("Reading column batch of size = " + total);
103+
LOG.debug("Reading column batch of size = {}", total);
104104

105105
this.currentNumValues = total;
106106
}
@@ -131,10 +131,7 @@ public CometDecodedVector loadVector() {
131131
currentVector.close();
132132
}
133133

134-
LogicalTypeAnnotation logicalTypeAnnotation =
135-
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
136-
boolean isUuid =
137-
logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
134+
// TODO: ARROW NATIVE : Handle Uuid?
138135

139136
array = ArrowArray.allocateNew(ALLOCATOR);
140137
schema = ArrowSchema.allocateNew(ALLOCATOR);
@@ -144,47 +141,19 @@ public CometDecodedVector loadVector() {
144141

145142
Native.currentColumnBatch(nativeBatchHandle, columnNum, arrayAddr, schemaAddr);
146143

147-
FieldVector vector = importer.importVector(array, schema);
144+
ArrowArray[] arrays = {array};
145+
ArrowSchema[] schemas = {schema};
148146

149-
DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();
150-
151-
CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
147+
CometDecodedVector cometVector =
148+
(CometDecodedVector)
149+
scala.collection.JavaConverters.seqAsJavaList(nativeUtil.importVector(arrays, schemas))
150+
.get(0);
152151

153152
// Update whether the current vector contains any null values. This is used in the following
154153
// batch(s) to determine whether we can skip loading the native vector.
155154
hadNull = cometVector.hasNull();
156155

157-
if (dictionaryEncoding == null) {
158-
if (dictionary != null) {
159-
// This means the column was using dictionary encoding but now has fall-back to plain
160-
// encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
161-
// a condition to check if we can re-use vector later.
162-
dictionary = null;
163-
}
164-
// Either the column is not dictionary encoded, or it was using dictionary encoding but
165-
// a new data page has switched back to use plain encoding. For both cases we should
166-
// return plain vector.
167-
currentVector = cometVector;
168-
return currentVector;
169-
}
170-
171-
// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
172-
// release the previous dictionary vector and create a new one.
173-
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
174-
CometPlainVector dictionaryVector =
175-
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
176-
if (dictionary != null) {
177-
dictionary.setDictionaryVector(dictionaryVector);
178-
} else {
179-
dictionary = new CometDictionary(dictionaryVector);
180-
}
181-
182-
currentVector =
183-
new CometDictionaryVector(
184-
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);
185-
186-
currentVector =
187-
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
156+
currentVector = cometVector;
188157
return currentVector;
189158
}
190159
}

common/src/main/java/org/apache/comet/vector/CometVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ public DictionaryProvider getDictionaryProvider() {
232232
* @param useDecimal128 Whether to use Decimal128 for decimal column
233233
* @return `CometVector` implementation
234234
*/
235-
protected static CometVector getVector(
235+
public static CometVector getVector(
236236
ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) {
237237
if (vector instanceof StructVector) {
238238
return new CometStructVector(vector, useDecimal128, dictionaryProvider);

0 commit comments

Comments
 (0)