Skip to content

Commit

Permalink
[core] Support bloom filter in sort file store reader (#4132)
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored Sep 5, 2024
1 parent aa04090 commit d56a6cf
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.MurmurHashUtils;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -53,6 +55,7 @@ public class SortLookupStoreReader implements LookupStoreReader {
private final long fileSize;

private final BlockIterator indexBlockIterator;
@Nullable private final BloomFilter bloomFilter;

public SortLookupStoreReader(
Comparator<MemorySlice> comparator,
Expand All @@ -68,7 +71,18 @@ public SortLookupStoreReader(

Footer footer = readFooter();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator();
// TODO read bloom filter block
this.bloomFilter = readBloomFilter(footer.getBloomFilterHandle());
}

private BloomFilter readBloomFilter(@Nullable BloomFilterHandle bloomFilterHandle)
throws IOException {
BloomFilter bloomFilter = null;
if (bloomFilterHandle != null) {
MemorySegment segment = read(bloomFilterHandle.offset(), bloomFilterHandle.size());
bloomFilter = new BloomFilter(bloomFilterHandle.expectedEntries(), segment.size());
bloomFilter.setMemorySegment(segment, 0);
}
return bloomFilter;
}

private Footer readFooter() throws IOException {
Expand All @@ -79,6 +93,10 @@ private Footer readFooter() throws IOException {
@Nullable
@Override
public byte[] lookup(byte[] key) throws IOException {
if (bloomFilter != null && !bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) {
return null;
}

MemorySlice keySlice = MemorySlice.wrap(key);
// seek the index to the block containing the key
indexBlockIterator.seekTo(keySlice);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,60 @@
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory.Context;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BloomFilter;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link SortLookupStoreFactory}. */
@ExtendWith(ParameterizedTestExtension.class)
public class SortLookupStoreFactoryTest {
private static final int VALUE_COUNT = 10_000_000;
private static final int QUERY_COUNT = 10_000;

private final ThreadLocalRandom rnd = ThreadLocalRandom.current();
private final boolean bloomFilterEnabled;
private final CompressOptions compress;

@TempDir Path tempDir;

private File file;

public SortLookupStoreFactoryTest(List<Object> var) {
this.bloomFilterEnabled = (Boolean) var.get(0);
this.compress = new CompressOptions((String) var.get(1), 1);
}

@SuppressWarnings("unused")
@Parameters(name = "enableBf&compress-{0}")
public static List<List<Object>> getVarSeg() {
return Arrays.asList(
Arrays.asList(true, "none"),
Arrays.asList(false, "none"),
Arrays.asList(false, "lz4"),
Arrays.asList(true, "lz4"),
Arrays.asList(false, "zstd"),
Arrays.asList(true, "zstd"));
}

@BeforeEach
public void before() throws Exception {
file = new File(tempDir.toFile(), UUID.randomUUID().toString());
Expand All @@ -60,16 +86,17 @@ public void before() throws Exception {
}
}

@Test
@TestTemplate
public void testNormal() throws IOException {
SortLookupStoreFactory factory =
new SortLookupStoreFactory(
Comparator.naturalOrder(),
new CacheManager(MemorySize.ofMebiBytes(1)),
1024,
CompressOptions.defaultOptions());
compress);

SortLookupStoreWriter writer = factory.createWriter(file, null);
SortLookupStoreWriter writer =
factory.createWriter(file, createBloomFiler(bloomFilterEnabled));
for (int i = 0; i < VALUE_COUNT; i++) {
byte[] bytes = toBytes(i);
writer.put(bytes, bytes);
Expand All @@ -82,10 +109,13 @@ public void testNormal() throws IOException {
byte[] bytes = toBytes(query);
assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(query);
}

assertThat(reader.lookup(toBytes(VALUE_COUNT + 1000))).isNull();

reader.close();
}

@Test
@TestTemplate
public void testIntKey() throws IOException {
RowCompactedSerializer keySerializer =
new RowCompactedSerializer(RowType.of(new IntType()));
Expand All @@ -95,8 +125,9 @@ public void testIntKey() throws IOException {
keySerializer.createSliceComparator(),
new CacheManager(MemorySize.ofMebiBytes(1)),
64 * 1024,
CompressOptions.defaultOptions());
SortLookupStoreWriter writer = factory.createWriter(file, null);
compress);
SortLookupStoreWriter writer =
factory.createWriter(file, createBloomFiler(bloomFilterEnabled));
for (int i = 0; i < VALUE_COUNT; i++) {
byte[] bytes = toBytes(keySerializer, row, i);
writer.put(bytes, toBytes(i));
Expand All @@ -114,9 +145,19 @@ public void testIntKey() throws IOException {
throw new RuntimeException(e);
}
}

assertThat(reader.lookup(toBytes(VALUE_COUNT + 1000))).isNull();

reader.close();
}

private BloomFilter.Builder createBloomFiler(boolean enabled) {
if (!enabled) {
return null;
}
return BloomFilter.builder(100, 0.01);
}

private byte[] toBytes(RowCompactedSerializer serializer, GenericRow row, int i) {
row.setField(0, i);
return serializer.serializeToBytes(row);
Expand Down

0 comments on commit d56a6cf

Please sign in to comment.