Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k #3041

Merged
merged 4 commits into from
Nov 6, 2024

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Nov 3, 2024

Rationale for this change

Fixes the data loss issue that reported in #3040

What changes are included in this PR?

Ensure that StreamBytesInput#writeInto(ByteBuffer buffer) copies data fully, even if the underlying InputStream does not report available correctly.

Are these changes tested?

UTs are added, I also tested it with an internal production data loss case.

Are there any user-facing changes?

Yes, this fixes some data loss cases, and I acknowledge that the bug affects Spark 4.0.0 preview2 which ships Parquet 1.14.2.

Closes #3040

super(buf);
}

// In practice, there are some implementations always return 0 even if they has more data
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my case, the underlying IntputStream is H1SeekableInputStream

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity. Why are you using H1SeekableInputStream? This one is related to Hadoop 1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My test code is
(sorry, the file contains private data so I can not share)

public class MyDictionaryFilterTest {

  private static final Configuration conf = new Configuration();
  List<ColumnChunkMetaData> ccmd;
  ParquetFileReader reader;
  DictionaryPageReadStore dictionaries;
  private Path file = new Path("/Users/chengpan/Temp/part-2bb8404a-f6e5-4e9f-9161-f749c4bf46d0-2-2222");

  @Before
  public void setUp() throws Exception {
    reader = ParquetFileReader.open(conf, file);
    ParquetMetadata meta = reader.getFooter();
    ccmd = meta.getBlocks().get(0).getColumns();
    dictionaries = reader.getDictionaryReader(meta.getBlocks().get(0));
  }

  @After
  public void tearDown() throws Exception {
    reader.close();
  }

  @Test
  public void testEqBinary() throws Exception {
    BinaryColumn b = binaryColumn("source_id");
    FilterPredicate pred = eq(b, Binary.fromString("5059661515"));

    assertFalse(canDrop(pred, ccmd, dictionaries));
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is very helpful!

@pan3793
Copy link
Member Author

pan3793 commented Nov 3, 2024

cc @gszadovszky @wgtmac @Fokko

byte[] input = new byte[data.length + 10];
RANDOM.nextBytes(input);
System.arraycopy(data, 0, input, 0, data.length);
Supplier<BytesInput> factory = () -> BytesInput.from(new AvailableAgnosticInputStream(input), 9 * 1024);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using an anonymous class here instead of adding a new file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to use a new file as it is a quite common case that needs to be tested, it might be used in other places in the future.

ReadableByteChannel channel = Channels.newChannel(in);
int remaining = byteCount;
while (remaining > 0) {
remaining -= channel.read(workBuf);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is remaining reliable? Should we check the return value of channel.read(workBuf)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a check to detect EOF case

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Thanks @pan3793!

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch @pan3793

@Fokko Fokko added this to the 1.14.4 milestone Nov 4, 2024
@@ -376,7 +378,15 @@ void writeInto(ByteBuffer buffer) {
ByteBuffer workBuf = buffer.duplicate();
int pos = buffer.position();
workBuf.limit(pos + byteCount);
Channels.newChannel(in).read(workBuf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other place that is used like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the original PR and found nothing else, it would be great if others have a double check.

@Fokko Fokko merged commit 2e765cc into apache:master Nov 6, 2024
9 checks passed
@Fokko
Copy link
Contributor

Fokko commented Nov 6, 2024

Thanks @pan3793 for finding and fixing this, and thanks @wgtmac @ConeyLiu and @gszadovszky for the review 🙌

Fokko pushed a commit that referenced this pull request Nov 6, 2024
…en dict size exceeds 8k (#3041)

* GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k

* style

* check bytesRead

* import
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k
5 participants