-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k #3041
Conversation
…ult when dict size exceeds 8k
super(buf); | ||
} | ||
|
||
// In practice, there are some implementations always return 0 even if they has more data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in my case, the underlying IntputStream
is H1SeekableInputStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity. Why are you using H1SeekableInputStream
? This one is related to Hadoop 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My test code is
(sorry, the file contains private data so I can not share)
public class MyDictionaryFilterTest {
private static final Configuration conf = new Configuration();
List<ColumnChunkMetaData> ccmd;
ParquetFileReader reader;
DictionaryPageReadStore dictionaries;
private Path file = new Path("/Users/chengpan/Temp/part-2bb8404a-f6e5-4e9f-9161-f749c4bf46d0-2-2222");
@Before
public void setUp() throws Exception {
reader = ParquetFileReader.open(conf, file);
ParquetMetadata meta = reader.getFooter();
ccmd = meta.getBlocks().get(0).getColumns();
dictionaries = reader.getDictionaryReader(meta.getBlocks().get(0));
}
@After
public void tearDown() throws Exception {
reader.close();
}
@Test
public void testEqBinary() throws Exception {
BinaryColumn b = binaryColumn("source_id");
FilterPredicate pred = eq(b, Binary.fromString("5059661515"));
assertFalse(canDrop(pred, ccmd, dictionaries));
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is very helpful!
byte[] input = new byte[data.length + 10]; | ||
RANDOM.nextBytes(input); | ||
System.arraycopy(data, 0, input, 0, data.length); | ||
Supplier<BytesInput> factory = () -> BytesInput.from(new AvailableAgnosticInputStream(input), 9 * 1024); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using an anonymous class here instead of adding a new file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to use a new file as it is a quite common case that needs to be tested, it might be used in other places in the future.
ReadableByteChannel channel = Channels.newChannel(in); | ||
int remaining = byteCount; | ||
while (remaining > 0) { | ||
remaining -= channel.read(workBuf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is remaining
reliable? Should we check the return value of channel.read(workBuf)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a check to detect EOF case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Thanks @pan3793!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch @pan3793
@@ -376,7 +378,15 @@ void writeInto(ByteBuffer buffer) { | |||
ByteBuffer workBuf = buffer.duplicate(); | |||
int pos = buffer.position(); | |||
workBuf.limit(pos + byteCount); | |||
Channels.newChannel(in).read(workBuf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other place that is used like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through the original PR and found nothing else, it would be great if others have a double check.
Thanks @pan3793 for finding and fixing this, and thanks @wgtmac @ConeyLiu and @gszadovszky for the review 🙌 |
Rationale for this change
Fixes the data loss issue that reported in #3040
What changes are included in this PR?
Ensure that
StreamBytesInput#writeInto(ByteBuffer buffer)
copies data fully, even if the underlyingInputStream
does not report available correctly.Are these changes tested?
UTs are added, I also tested it with an internal production data loss case.
Are there any user-facing changes?
Yes, this fixes some data loss cases, and I acknowledge that the bug affects Spark 4.0.0 preview2 which ships Parquet 1.14.2.
Closes #3040