Skip to content

Commit

Permalink
DRILL-8486: fix handling of long variable length entries during bulk …
Browse files Browse the repository at this point in the history
…parquet reading (#2898)
  • Loading branch information
rymarm authored Apr 10, 2024
1 parent 86d3c66 commit af7cfcd
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ final VarLenColumnBulkEntry getEntry(int valuesToRead) {
if (bulkProcess()) {
return getEntryBulk(valuesToRead);
}
return getEntrySingle(valuesToRead);
return getEntrySingle();
}

private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
Expand Down Expand Up @@ -82,7 +82,7 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
// We're here either because a) the Parquet metadata is wrong (advertises more values than the real count)
// or the first value being processed ended up to be too long for the buffer.
if (numValues == 0) {
return getEntrySingle(valuesToRead);
return getEntrySingle();
}

// Now set the bulk entry
Expand All @@ -91,14 +91,15 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
return entry;
}

private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) {
private VarLenColumnBulkEntry getEntrySingle() {
final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader;
final int[] valueLengths = entry.getValuesLength();
final Binary currEntry = valueReader.getEntry();
final int dataLen = currEntry.length();

// Is there enough memory to handle this large value?
if (batchMemoryConstraintsReached(0, 4, dataLen)) {
valueReader.pushBack(currEntry);
entry.set(0, 0, 0, 0); // no data to be consumed
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ VarLenColumnBulkEntry getEntry(int valuesToRead) {
if (bulkProcess()) {
return getEntryBulk(valuesToRead);
}
return getEntrySingle(valuesToRead);
return getEntrySingle();
}

private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
Expand Down Expand Up @@ -92,7 +92,7 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
// We're here either because a) the Parquet metadata is wrong (advertises more values than the real count)
// or the first value being processed ended up to be too long for the buffer.
if (numValues == 0) {
return getEntrySingle(valuesToRead);
return getEntrySingle();
}

// Update the page data buffer offset
Expand All @@ -109,7 +109,7 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
return entry;
}

private final VarLenColumnBulkEntry getEntrySingle(int valuesToRead) {
private VarLenColumnBulkEntry getEntrySingle() {

if (remainingPageData() < 4) {
final String message = String.format("Invalid Parquet page metadata; cannot process advertised page count..");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,32 @@
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;

import java.nio.ByteBuffer;

import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ValuesReaderWrapper;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
import org.apache.parquet.io.api.Binary;

/** Handles nullable variable data types using a dictionary */
/**
* Handles nullable variable data types using a dictionary
*/
final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader {

VarLenNullableDictionaryReader(ByteBuffer buffer,
PageDataInfo pageInfo,
ColumnPrecisionInfo columnPrecInfo,
VarLenColumnBulkEntry entry,
VarLenColumnBulkInputCallback containerCallback) {
PageDataInfo pageInfo,
ColumnPrecisionInfo columnPrecInfo,
VarLenColumnBulkEntry entry,
VarLenColumnBulkInputCallback containerCallback) {

super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
final VarLenColumnBulkEntry getEntry(int valuesToRead) {
assert valuesToRead > 0;
Expand All @@ -46,7 +52,7 @@ final VarLenColumnBulkEntry getEntry(int valuesToRead) {
if (bulkProcess()) {
return getEntryBulk(valuesToRead);
}
return getEntrySingle(valuesToRead);
return getEntrySingle();
}

private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
Expand All @@ -66,7 +72,7 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
// Initialize the reader if needed
pageInfo.definitionLevels.readFirstIntegerIfNeeded();

for (int idx = 0; idx < readBatch; ++idx ) {
for (int idx = 0; idx < readBatch; ++idx) {
if (pageInfo.definitionLevels.readCurrInteger() == 1) {
final Binary currEntry = valueReader.getEntry();
final int dataLen = currEntry.length();
Expand Down Expand Up @@ -97,15 +103,15 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
// We're here either because a) the Parquet metadata is wrong (advertises more values than the real count)
// or the first value being processed ended up to be too long for the buffer.
if (numValues == 0) {
return getEntrySingle(valuesToRead);
return getEntrySingle();
}

entry.set(0, tgtPos, numValues, numValues - numNulls);

return entry;
}

private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) {
private VarLenColumnBulkEntry getEntrySingle() {
final int[] valueLengths = entry.getValuesLength();
final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader;

Expand All @@ -118,6 +124,7 @@ private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) {

// Is there enough memory to handle this large value?
if (batchMemoryConstraintsReached(1, 4, dataLen)) {
valueReader.pushBack(currEntry);
entry.set(0, 0, 0, 0); // no data to be consumed
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ VarLenColumnBulkEntry getEntry(int valuesToRead) {
if (bulkProcess()) {
return getEntryBulk(valuesToRead);
}
return getEntrySingle(valuesToRead);
return getEntrySingle();
}

VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
Expand Down Expand Up @@ -108,7 +108,7 @@ VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
// We're here either because a) the Parquet metadata is wrong (advertises more values than the real count)
// or the first value being processed ended up to be too long for the buffer.
if (numValues == 0) {
return getEntrySingle(valuesToRead);
return getEntrySingle();
}

// Update the page data buffer offset
Expand All @@ -126,7 +126,7 @@ VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
return entry;
}

VarLenColumnBulkEntry getEntrySingle(int valuesToRead) {
VarLenColumnBulkEntry getEntrySingle() {

// Initialize the reader if needed
pageInfo.definitionLevels.readFirstIntegerIfNeeded();
Expand Down

0 comments on commit af7cfcd

Please sign in to comment.