From 63bd4638e2053d0d0e84b58f91b93a25b0aeb4f1 Mon Sep 17 00:00:00 2001 From: rtjd6554 <174791724+rtjd6554@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:47:36 +0100 Subject: [PATCH 01/16] Refactor Integration tests into unit test --- .../sleeper/splitter/SplitPartitionIT.java | 51 +++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java index f3d5bca607..0295ddebef 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java @@ -33,6 +33,7 @@ import sleeper.core.schema.type.LongType; import sleeper.core.schema.type.StringType; import sleeper.core.statestore.FileReference; +import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStore; import sleeper.ingest.IngestRecordsFromIterator; import sleeper.ingest.impl.IngestCoordinator; @@ -40,14 +41,19 @@ import sleeper.ingest.impl.partitionfilewriter.DirectPartitionFileWriterFactory; import sleeper.ingest.impl.recordbatch.arraylist.ArrayListRecordBatchFactory; import sleeper.ingest.testutils.IngestCoordinatorTestHelper; +import sleeper.sketches.Sketches; +import sleeper.splitter.SplitMultiDimensionalPartitionImpl.SketchesLoader; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -65,6 +71,8 @@ public class SplitPartitionIT { private final Field field = new Field("key", new IntType()); private final Schema schema = Schema.builder().rowKeyFields(field).build(); + + private Map fileToSketchMap = new HashMap<>(); private String localDir; private String filePathPrefix; @@ -266,12 +274,13 @@ void shouldSplitPartitionForIntKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketch(schema, stateStore, IntStream.range(100 * i, 100 * (i + 1)) - .mapToObj(r -> new Record(Map.of("key", r))))); + .mapToObj(r -> new Record(Map.of("key", r))), + "A")); // When - splitSinglePartition(schema, stateStore, generateIds("B", "C")); + splitSinglePartitionFromMap(schema, stateStore, generateIds("B", "C")); // Then assertThat(stateStore.getAllPartitions()) @@ -535,6 +544,25 @@ private static void ingestRecordsFromIterator( new IngestRecordsFromIterator(ingestCoordinator, recordIterator).write(); } + private void ingestRecordsToSketch(Schema schema, StateStore stateStore, Stream recordsStream, String partitionId) { + Sketches sketches = Sketches.from(schema); + AtomicLong recordCount = new AtomicLong(); + + recordsStream.forEach(rec -> { + sketches.update(schema, rec); + recordCount.incrementAndGet(); + }); + + FileReference recordFileReference = FileReferenceFactory.from(stateStore).partitionFile(partitionId, UUID.randomUUID().toString(), recordCount.get()); + try { + stateStore.addFile(recordFileReference); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + + fileToSketchMap.put(recordFileReference.getFilename(), sketches); + } + private void ingestFileFromRecords(Schema schema, StateStore stateStore, Stream recordsStream) { try { ingestRecordsFromIterator(schema, stateStore, localDir, filePathPrefix, recordsStream.iterator()); @@ -543,15 +571,28 @@ private void ingestFileFromRecords(Schema schema, StateStore stateStore, Stream< } } - private static void splitSinglePartition(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { + private static void splitSinglePartition(StateStore stateStore, SplitPartition partitionSplitter) throws Exception { Partition partition = stateStore.getAllPartitions().get(0); List fileNames = stateStore.getFileReferences().stream() .map(FileReference::getFilename) .collect(Collectors.toList()); - SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromFile(schema, new Configuration()), generateIds); partitionSplitter.splitPartition(partition, fileNames); } + private void splitSinglePartition(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { + SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromFile(schema, new Configuration()), generateIds); + splitSinglePartition(stateStore, partitionSplitter); + } + + private void splitSinglePartitionFromMap(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { + SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromMap(schema, new Configuration()), generateIds); + splitSinglePartition(stateStore, partitionSplitter); + } + + public SketchesLoader loadFromMap(Schema schema, Configuration conf) { + return (filename) -> fileToSketchMap.get(filename); + } + private static void splitPartition(Schema schema, StateStore stateStore, String partitionId, Supplier generateIds) throws Exception { PartitionTree tree = new PartitionTree(stateStore.getAllPartitions()); Partition partition = tree.getPartition(partitionId); From cb1b50ff782a251b1f8c97b395f8459db5ed7f8f Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:23:00 +0000 Subject: [PATCH 02/16] Adjust signature of ingestRecordsToSketchOnPartition --- .../src/test/java/sleeper/splitter/SplitPartitionIT.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java index 0295ddebef..f3471d3699 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java @@ -274,10 +274,9 @@ void shouldSplitPartitionForIntKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestRecordsToSketch(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(100 * i, 100 * (i + 1)) - .mapToObj(r -> new Record(Map.of("key", r))), - "A")); + .mapToObj(r -> new Record(Map.of("key", r))))); // When splitSinglePartitionFromMap(schema, stateStore, generateIds("B", "C")); @@ -544,7 +543,7 @@ private static void ingestRecordsFromIterator( new IngestRecordsFromIterator(ingestCoordinator, recordIterator).write(); } - private void ingestRecordsToSketch(Schema schema, StateStore stateStore, Stream recordsStream, String partitionId) { + private void ingestRecordsToSketchOnPartition(Schema schema, StateStore stateStore, String partitionId, Stream recordsStream) { Sketches sketches = Sketches.from(schema); AtomicLong recordCount = new AtomicLong(); From d50892317785f549325eb6c09e0247668d81285a Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:27:21 +0000 Subject: [PATCH 03/16] Replace usages of splitSinglePartition --- .../sleeper/splitter/SplitPartitionIT.java | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java index f3471d3699..ea671e8b51 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java @@ -279,7 +279,7 @@ void shouldSplitPartitionForIntKey() throws Exception { .mapToObj(r -> new Record(Map.of("key", r))))); // When - splitSinglePartitionFromMap(schema, stateStore, generateIds("B", "C")); + splitSinglePartition(schema, stateStore, generateIds("B", "C")); // Then assertThat(stateStore.getAllPartitions()) @@ -297,7 +297,7 @@ void shouldSplitPartitionForLongKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", LongStream.range(100L * i, 100L * (i + 1)) .mapToObj(r -> new Record(Map.of("key", r))))); @@ -320,7 +320,7 @@ void shouldSplitPartitionForStringKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of("key", String.format("A%s%s", i, r)))))); @@ -343,7 +343,7 @@ void shouldSplitPartitionForByteArrayKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of("key", new byte[]{(byte) r}))))); @@ -372,7 +372,7 @@ public void shouldSplitIntKeyOnFirstDimension() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", r, @@ -399,7 +399,7 @@ public void shouldSplitIntKeyOnSecondDimensionWhenAllValuesForFirstKeyAreTheSame .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", 10, @@ -426,7 +426,7 @@ public void shouldSplitIntKeyOnFirstDimensionWhenSecondDimensionCanBeSplit() thr .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", r, @@ -453,7 +453,7 @@ public void shouldSplitIntKeyOnSecondDimensionWhenMinAndMedianForFirstKeyAreTheS .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) // The majority of the values are 10; so min should equal median .mapToObj(r -> new Record(Map.of( @@ -482,7 +482,7 @@ public void shouldSplitByteKeyOnFirstDimension() throws Exception { .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", new byte[]{(byte) r}, @@ -509,7 +509,7 @@ public void shouldSplitByteKeyOnSecondDimensionWhenAllValuesForFirstKeyAreTheSam .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", new byte[]{(byte) -100}, @@ -579,16 +579,11 @@ private static void splitSinglePartition(StateStore stateStore, SplitPartition p } private void splitSinglePartition(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { - SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromFile(schema, new Configuration()), generateIds); - splitSinglePartition(stateStore, partitionSplitter); - } - - private void splitSinglePartitionFromMap(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { - SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromMap(schema, new Configuration()), generateIds); + SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromMap(), generateIds); splitSinglePartition(stateStore, partitionSplitter); } - public SketchesLoader loadFromMap(Schema schema, Configuration conf) { + public SketchesLoader loadFromMap() { return (filename) -> fileToSketchMap.get(filename); } From 3c3a3c8013527911c722c8c9c21151f882712a1e Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:38:05 +0000 Subject: [PATCH 04/16] Convert SplitPartitionIT to SplitPartitionTest --- ...rtitionIT.java => SplitPartitionTest.java} | 76 +++---------------- 1 file changed, 9 insertions(+), 67 deletions(-) rename java/splitter/splitter-core/src/test/java/sleeper/splitter/{SplitPartitionIT.java => SplitPartitionTest.java} (88%) diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java similarity index 88% rename from java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java rename to java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java index ea671e8b51..7ceea2f36b 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java @@ -15,12 +15,9 @@ */ package sleeper.splitter; -import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import sleeper.core.partition.Partition; import sleeper.core.partition.PartitionTree; @@ -35,21 +32,12 @@ import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStore; -import sleeper.ingest.IngestRecordsFromIterator; -import sleeper.ingest.impl.IngestCoordinator; -import sleeper.ingest.impl.ParquetConfiguration; -import sleeper.ingest.impl.partitionfilewriter.DirectPartitionFileWriterFactory; -import sleeper.ingest.impl.recordbatch.arraylist.ArrayListRecordBatchFactory; -import sleeper.ingest.testutils.IngestCoordinatorTestHelper; import sleeper.sketches.Sketches; import sleeper.splitter.SplitMultiDimensionalPartitionImpl.SketchesLoader; -import java.io.IOException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -60,27 +48,14 @@ import java.util.stream.LongStream; import java.util.stream.Stream; -import static java.nio.file.Files.createTempDirectory; import static org.assertj.core.api.Assertions.assertThat; import static sleeper.core.statestore.inmemory.StateStoreTestHelper.inMemoryStateStoreWithPartitions; -import static sleeper.splitter.SplitMultiDimensionalPartitionImpl.loadFromFile; - -public class SplitPartitionIT { - @TempDir - public Path folder; +public class SplitPartitionTest { private final Field field = new Field("key", new IntType()); private final Schema schema = Schema.builder().rowKeyFields(field).build(); private Map fileToSketchMap = new HashMap<>(); - private String localDir; - private String filePathPrefix; - - @BeforeEach - void setUp() throws IOException { - localDir = createTempDirectory(folder, null).toString(); - filePathPrefix = createTempDirectory(folder, null).toString(); - } @Nested @DisplayName("Skip split") @@ -106,7 +81,7 @@ public void shouldNotSplitPartitionForIntKeyIfItCannotBeSplitBecausePartitionIsO record.put("key", r); records.add(record); } - ingestFileFromRecords(schema, stateStore, records.stream()); + ingestRecordsToSketchOnPartition(schema, stateStore, partition.getId(), records.stream()); } } @@ -148,7 +123,7 @@ public void shouldNotSplitPartitionForIntKeyIfItCannotBeSplitBecauseDataIsConsta records.add(record); } } - ingestFileFromRecords(schema, stateStore, records.stream()); + ingestRecordsToSketchOnPartition(schema, stateStore, partition.getId(), records.stream()); } } @@ -175,11 +150,6 @@ public void shouldNotSplitPartitionForByteArrayKeyIfItCannotBeSplitBecausePartit for (Partition partition : tree.getAllPartitions()) { for (int i = 0; i < 10; i++) { List records = new ArrayList<>(); - for (int r = 0; r < 100; r++) { - Record record = new Record(); - record.put("key", new byte[]{(byte) r}); - records.add(record); - } if (partition.getId().equals("id1")) { int j = 0; for (byte r = (byte) 0; @@ -202,7 +172,7 @@ public void shouldNotSplitPartitionForByteArrayKeyIfItCannotBeSplitBecausePartit records.add(record); } } - ingestFileFromRecords(schema, stateStore, records.stream()); + ingestRecordsToSketchOnPartition(schema, stateStore, partition.getId(), records.stream()); } } @@ -251,7 +221,7 @@ public void shouldNotSplitPartitionForByteArrayKeyIfItCannotBeSplitBecauseDataIs records.add(record); } } - ingestFileFromRecords(schema, stateStore, records.stream()); + ingestRecordsToSketchOnPartition(schema, stateStore, partition.getId(), records.stream()); } } @@ -527,22 +497,6 @@ public void shouldSplitByteKeyOnSecondDimensionWhenAllValuesForFirstKeyAreTheSam } } - private static void ingestRecordsFromIterator( - Schema schema, StateStore stateStore, String localDir, - String filePathPrefix, Iterator recordIterator) throws Exception { - ParquetConfiguration parquetConfiguration = IngestCoordinatorTestHelper.parquetConfiguration(schema, new Configuration()); - IngestCoordinator ingestCoordinator = IngestCoordinatorTestHelper.standardIngestCoordinator( - stateStore, schema, - ArrayListRecordBatchFactory.builder() - .localWorkingDirectory(localDir) - .maxNoOfRecordsInMemory(1000000) - .maxNoOfRecordsInLocalStore(100000000) - .parquetConfiguration(parquetConfiguration) - .buildAcceptingRecords(), - DirectPartitionFileWriterFactory.from(parquetConfiguration, filePathPrefix)); - new IngestRecordsFromIterator(ingestCoordinator, recordIterator).write(); - } - private void ingestRecordsToSketchOnPartition(Schema schema, StateStore stateStore, String partitionId, Stream recordsStream) { Sketches sketches = Sketches.from(schema); AtomicLong recordCount = new AtomicLong(); @@ -562,39 +516,27 @@ private void ingestRecordsToSketchOnPartition(Schema schema, StateStore stateSto fileToSketchMap.put(recordFileReference.getFilename(), sketches); } - private void ingestFileFromRecords(Schema schema, StateStore stateStore, Stream recordsStream) { - try { - ingestRecordsFromIterator(schema, stateStore, localDir, filePathPrefix, recordsStream.iterator()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private static void splitSinglePartition(StateStore stateStore, SplitPartition partitionSplitter) throws Exception { + private void splitSinglePartition(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { Partition partition = stateStore.getAllPartitions().get(0); List fileNames = stateStore.getFileReferences().stream() .map(FileReference::getFilename) .collect(Collectors.toList()); - partitionSplitter.splitPartition(partition, fileNames); - } - - private void splitSinglePartition(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromMap(), generateIds); - splitSinglePartition(stateStore, partitionSplitter); + partitionSplitter.splitPartition(partition, fileNames); } public SketchesLoader loadFromMap() { return (filename) -> fileToSketchMap.get(filename); } - private static void splitPartition(Schema schema, StateStore stateStore, String partitionId, Supplier generateIds) throws Exception { + private void splitPartition(Schema schema, StateStore stateStore, String partitionId, Supplier generateIds) throws Exception { PartitionTree tree = new PartitionTree(stateStore.getAllPartitions()); Partition partition = tree.getPartition(partitionId); List fileNames = stateStore.getFileReferences().stream() .filter(file -> partitionId.equals(file.getPartitionId())) .map(FileReference::getFilename) .collect(Collectors.toList()); - SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromFile(schema, new Configuration()), generateIds); + SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromMap(), generateIds); partitionSplitter.splitPartition(partition, fileNames); } From b51d2b241ca3eb15c530ccbdd5677b883b0ffa16 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:38:24 +0000 Subject: [PATCH 05/16] Make fileToSketchMap final --- .../src/test/java/sleeper/splitter/SplitPartitionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java index 7ceea2f36b..db9b0f4e41 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java @@ -55,7 +55,7 @@ public class SplitPartitionTest { private final Field field = new Field("key", new IntType()); private final Schema schema = Schema.builder().rowKeyFields(field).build(); - private Map fileToSketchMap = new HashMap<>(); + private final Map fileToSketchMap = new HashMap<>(); @Nested @DisplayName("Skip split") From 3795193b4035fade9a8a58e3347081443de4dcb3 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:39:06 +0000 Subject: [PATCH 06/16] Rename SketchesLoader implementation methods --- .../splitter/SplitMultiDimensionalPartitionImpl.java | 2 +- .../src/main/java/sleeper/splitter/SplitPartition.java | 4 ++-- .../src/test/java/sleeper/splitter/SplitPartitionTest.java | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java index b14f479ba5..bf524007dc 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java @@ -302,7 +302,7 @@ public interface SketchesLoader { Sketches load(String filename) throws IOException; } - public static SketchesLoader loadFromFile(Schema schema, Configuration conf) { + public static SketchesLoader loadSketchesFromFile(Schema schema, Configuration conf) { return (filename) -> new SketchesSerDeToS3(schema).loadFromHadoopFS(new Path(filename), conf); } } diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java index 5cf08b36c7..d591dbdcb0 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java @@ -28,7 +28,7 @@ import java.util.UUID; import java.util.function.Supplier; -import static sleeper.splitter.SplitMultiDimensionalPartitionImpl.loadFromFile; +import static sleeper.splitter.SplitMultiDimensionalPartitionImpl.loadSketchesFromFile; /** * Splits a partition. Delegates to {@link SplitMultiDimensionalPartitionImpl}. @@ -42,7 +42,7 @@ public class SplitPartition { public SplitPartition(StateStore stateStore, Schema schema, Configuration conf) { - this(stateStore, schema, loadFromFile(schema, conf)); + this(stateStore, schema, loadSketchesFromFile(schema, conf)); } public SplitPartition(StateStore stateStore, diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java index db9b0f4e41..af658bdd69 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java @@ -521,11 +521,11 @@ private void splitSinglePartition(Schema schema, StateStore stateStore, Supplier List fileNames = stateStore.getFileReferences().stream() .map(FileReference::getFilename) .collect(Collectors.toList()); - SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromMap(), generateIds); + SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadSketchesFromMap(), generateIds); partitionSplitter.splitPartition(partition, fileNames); } - public SketchesLoader loadFromMap() { + public SketchesLoader loadSketchesFromMap() { return (filename) -> fileToSketchMap.get(filename); } @@ -536,7 +536,7 @@ private void splitPartition(Schema schema, StateStore stateStore, String partiti .filter(file -> partitionId.equals(file.getPartitionId())) .map(FileReference::getFilename) .collect(Collectors.toList()); - SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromMap(), generateIds); + SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadSketchesFromMap(), generateIds); partitionSplitter.splitPartition(partition, fileNames); } From e19df499a68fd661d633b9174a7689111beb39de Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:39:28 +0000 Subject: [PATCH 07/16] Reorder methods --- .../test/java/sleeper/splitter/SplitPartitionTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java index af658bdd69..3c882d9ccf 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java @@ -525,10 +525,6 @@ private void splitSinglePartition(Schema schema, StateStore stateStore, Supplier partitionSplitter.splitPartition(partition, fileNames); } - public SketchesLoader loadSketchesFromMap() { - return (filename) -> fileToSketchMap.get(filename); - } - private void splitPartition(Schema schema, StateStore stateStore, String partitionId, Supplier generateIds) throws Exception { PartitionTree tree = new PartitionTree(stateStore.getAllPartitions()); Partition partition = tree.getPartition(partitionId); @@ -540,6 +536,10 @@ private void splitPartition(Schema schema, StateStore stateStore, String partiti partitionSplitter.splitPartition(partition, fileNames); } + public SketchesLoader loadSketchesFromMap() { + return (filename) -> fileToSketchMap.get(filename); + } + private static Supplier generateIds(String... ids) { return Arrays.stream(ids).iterator()::next; } From 123c9d7dff7ebd87b2ad77322c17db8bfc73ddb5 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:49:00 +0000 Subject: [PATCH 08/16] Extract FindPartitionSplitPoint --- .../splitter/FindPartitionSplitPoint.java | 201 ++++++++++++++++++ .../SplitMultiDimensionalPartitionImpl.java | 192 +---------------- .../java/sleeper/splitter/SplitPartition.java | 12 +- 3 files changed, 219 insertions(+), 186 deletions(-) create mode 100644 java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java new file mode 100644 index 0000000000..ed442394a3 --- /dev/null +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java @@ -0,0 +1,201 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.splitter; + +import com.facebook.collections.ByteArray; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.datasketches.quantiles.ItemsSketch; +import org.apache.datasketches.quantiles.ItemsUnion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sleeper.core.schema.Schema; +import sleeper.core.schema.type.ByteArrayType; +import sleeper.core.schema.type.IntType; +import sleeper.core.schema.type.LongType; +import sleeper.core.schema.type.PrimitiveType; +import sleeper.core.schema.type.StringType; +import sleeper.sketches.Sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +public class FindPartitionSplitPoint { + + public static final Logger LOGGER = LoggerFactory.getLogger(FindPartitionSplitPoint.class); + + private final Schema schema; + private final List rowKeyTypes; + private final List fileNames; + private final SketchesLoader sketchesLoader; + + public FindPartitionSplitPoint(Schema schema, List fileNames, SketchesLoader sketchesLoader) { + this.schema = schema; + this.rowKeyTypes = schema.getRowKeyTypes(); + this.fileNames = fileNames; + this.sketchesLoader = sketchesLoader; + } + + public Optional splitPointForDimension(int dimension) throws IOException { + PrimitiveType rowKeyType = rowKeyTypes.get(dimension); + LOGGER.info("Testing field {} of type {} (dimension {}) to see if it can be split", + schema.getRowKeyFieldNames().get(dimension), rowKeyType, dimension); + if (rowKeyType instanceof IntType) { + return splitPointForDimension(getMinMedianMaxIntKey(dimension), dimension); + } else if (rowKeyType instanceof LongType) { + return splitPointForDimension(getMinMedianMaxLongKey(dimension), dimension); + } else if (rowKeyType instanceof StringType) { + return splitPointForDimension(getMinMedianMaxStringKey(dimension), dimension); + } else if (rowKeyType instanceof ByteArrayType) { + return splitPointForDimension(getMinMedianMaxByteArrayKey(dimension), dimension, ByteArray::getArray); + } else { + throw new IllegalArgumentException("Unknown type " + rowKeyType); + } + } + + private > Optional splitPointForDimension( + Triple minMedianMax, int dimension) { + return splitPointForDimension(minMedianMax, dimension, median -> median); + } + + private > Optional splitPointForDimension( + Triple minMedianMax, int dimension, Function getValue) { + T min = minMedianMax.getLeft(); + T median = minMedianMax.getMiddle(); + T max = minMedianMax.getRight(); + LOGGER.debug("Min = {}, median = {}, max = {}", min, median, max); + if (min.compareTo(max) > 0) { + throw new IllegalStateException("Min > max"); + } + if (min.compareTo(median) < 0 && median.compareTo(max) < 0) { + LOGGER.debug("For dimension {} min < median && median < max", dimension); + return Optional.of(getValue.apply(median)); + } else { + LOGGER.info("For dimension {} it is not true that min < median && median < max, so NOT splitting", dimension); + return Optional.empty(); + } + } + + private Triple getMinMedianMaxIntKey(int dimension) throws IOException { + String keyField = schema.getRowKeyFields().get(dimension).getName(); + + // Read all sketches + List> sketchList = new ArrayList<>(); + for (String fileName : fileNames) { + String sketchesFile = fileName.replace(".parquet", ".sketches"); + LOGGER.info("Loading Sketches from {}", sketchesFile); + Sketches sketches = sketchesLoader.load(sketchesFile); + sketchList.add(sketches.getQuantilesSketch(keyField)); + } + + // Union all the sketches + ItemsUnion union = ItemsUnion.getInstance(16384, Comparator.naturalOrder()); + for (ItemsSketch s : sketchList) { + union.update(s); + } + ItemsSketch sketch = union.getResult(); + + Integer min = sketch.getMinValue(); + Integer median = sketch.getQuantile(0.5D); + Integer max = sketch.getMaxValue(); + return new ImmutableTriple<>(min, median, max); + } + + private Triple getMinMedianMaxLongKey(int dimension) throws IOException { + String keyField = schema.getRowKeyFields().get(dimension).getName(); + + // Read all sketches + List> sketchList = new ArrayList<>(); + for (String fileName : fileNames) { + String sketchesFile = fileName.replace(".parquet", ".sketches"); + LOGGER.info("Loading Sketches from {}", sketchesFile); + Sketches sketches = sketchesLoader.load(sketchesFile); + sketchList.add(sketches.getQuantilesSketch(keyField)); + } + + // Union all the sketches + ItemsUnion union = ItemsUnion.getInstance(16384, Comparator.naturalOrder()); + for (ItemsSketch s : sketchList) { + union.update(s); + } + ItemsSketch sketch = union.getResult(); + + Long min = sketch.getMinValue(); + Long median = sketch.getQuantile(0.5D); + Long max = sketch.getMaxValue(); + return new ImmutableTriple<>(min, median, max); + } + + private Triple getMinMedianMaxStringKey(int dimension) throws IOException { + String keyField = schema.getRowKeyFields().get(dimension).getName(); + + // Read all sketches + List> sketchList = new ArrayList<>(); + for (String fileName : fileNames) { + String sketchesFile = fileName.replace(".parquet", ".sketches"); + LOGGER.info("Loading Sketches from {}", sketchesFile); + Sketches sketches = sketchesLoader.load(sketchesFile); + sketchList.add(sketches.getQuantilesSketch(keyField)); + } + + // Union all the sketches + ItemsUnion union = ItemsUnion.getInstance(16384, Comparator.naturalOrder()); + for (ItemsSketch s : sketchList) { + union.update(s); + } + ItemsSketch sketch = union.getResult(); + + String min = sketch.getMinValue(); + String median = sketch.getQuantile(0.5D); + String max = sketch.getMaxValue(); + return new ImmutableTriple<>(min, median, max); + } + + private Triple getMinMedianMaxByteArrayKey(int dimension) throws IOException { + String keyField = schema.getRowKeyFields().get(dimension).getName(); + + // Read all sketches + List> sketchList = new ArrayList<>(); + for (String fileName : fileNames) { + String sketchesFile = fileName.replace(".parquet", ".sketches"); + LOGGER.info("Loading Sketches from {}", sketchesFile); + Sketches sketches = sketchesLoader.load(sketchesFile); + sketchList.add(sketches.getQuantilesSketch(keyField)); + } + + // Union all the sketches + ItemsUnion union = ItemsUnion.getInstance(16384, Comparator.naturalOrder()); + for (ItemsSketch s : sketchList) { + union.update(s); + } + ItemsSketch sketch = union.getResult(); + + ByteArray min = sketch.getMinValue(); + ByteArray median = sketch.getQuantile(0.5D); + ByteArray max = sketch.getMaxValue(); + return new ImmutableTriple<>(min, median, max); + } + + public interface SketchesLoader { + Sketches load(String filename) throws IOException; + } + +} diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java index bf524007dc..e7e921f7f7 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java @@ -15,11 +15,6 @@ */ package sleeper.splitter; -import com.facebook.collections.ByteArray; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.commons.lang3.tuple.Triple; -import org.apache.datasketches.quantiles.ItemsSketch; -import org.apache.datasketches.quantiles.ItemsUnion; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -31,11 +26,6 @@ import sleeper.core.range.Region; import sleeper.core.schema.Field; import sleeper.core.schema.Schema; -import sleeper.core.schema.type.ByteArrayType; -import sleeper.core.schema.type.IntType; -import sleeper.core.schema.type.LongType; -import sleeper.core.schema.type.PrimitiveType; -import sleeper.core.schema.type.StringType; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.sketches.Sketches; @@ -44,10 +34,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.List; -import java.util.Optional; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -73,186 +60,17 @@ public class SplitMultiDimensionalPartitionImpl { private final StateStore stateStore; private final Schema schema; - private final List rowKeyTypes; - private final Partition partition; - private final List fileNames; // These should be active files for the partition private final RangeFactory rangeFactory; private final Supplier idSupplier; - private final SketchesLoader sketchesLoader; - public SplitMultiDimensionalPartitionImpl(StateStore stateStore, - Schema schema, - Partition partition, - List fileNames, - Supplier idSupplier, - SketchesLoader sketchesLoader) { + public SplitMultiDimensionalPartitionImpl(StateStore stateStore, Schema schema, Supplier idSupplier) { this.stateStore = stateStore; this.schema = schema; - this.rowKeyTypes = schema.getRowKeyTypes(); - this.partition = partition; - this.fileNames = fileNames; this.rangeFactory = new RangeFactory(schema); this.idSupplier = idSupplier; - this.sketchesLoader = sketchesLoader; } - void splitPartition() throws StateStoreException, IOException { - for (int dimension = 0; dimension < rowKeyTypes.size(); dimension++) { - Optional splitPointOpt = splitPointForDimension(dimension); - if (splitPointOpt.isPresent()) { - splitPartition(partition, splitPointOpt.get(), dimension); - return; - } - } - } - - public Optional splitPointForDimension(int dimension) throws IOException { - PrimitiveType rowKeyType = rowKeyTypes.get(dimension); - LOGGER.info("Testing field {} of type {} (dimension {}) to see if it can be split", - schema.getRowKeyFieldNames().get(dimension), rowKeyType, dimension); - if (rowKeyType instanceof IntType) { - return splitPointForDimension(getMinMedianMaxIntKey(dimension), dimension); - } else if (rowKeyType instanceof LongType) { - return splitPointForDimension(getMinMedianMaxLongKey(dimension), dimension); - } else if (rowKeyType instanceof StringType) { - return splitPointForDimension(getMinMedianMaxStringKey(dimension), dimension); - } else if (rowKeyType instanceof ByteArrayType) { - return splitPointForDimension(getMinMedianMaxByteArrayKey(dimension), dimension, ByteArray::getArray); - } else { - throw new IllegalArgumentException("Unknown type " + rowKeyType); - } - } - - private > Optional splitPointForDimension( - Triple minMedianMax, int dimension) { - return splitPointForDimension(minMedianMax, dimension, median -> median); - } - - private > Optional splitPointForDimension( - Triple minMedianMax, int dimension, Function getValue) { - T min = minMedianMax.getLeft(); - T median = minMedianMax.getMiddle(); - T max = minMedianMax.getRight(); - LOGGER.debug("Min = {}, median = {}, max = {}", min, median, max); - if (min.compareTo(max) > 0) { - throw new IllegalStateException("Min > max"); - } - if (min.compareTo(median) < 0 && median.compareTo(max) < 0) { - LOGGER.debug("For dimension {} min < median && median < max", dimension); - return Optional.of(getValue.apply(median)); - } else { - LOGGER.info("For dimension {} it is not true that min < median && median < max, so NOT splitting", dimension); - return Optional.empty(); - } - } - - private Triple getMinMedianMaxIntKey(int dimension) throws IOException { - String keyField = schema.getRowKeyFields().get(dimension).getName(); - - // Read all sketches - List> sketchList = new ArrayList<>(); - for (String fileName : fileNames) { - String sketchesFile = fileName.replace(".parquet", ".sketches"); - LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); - sketchList.add(sketches.getQuantilesSketch(keyField)); - } - - // Union all the sketches - ItemsUnion union = ItemsUnion.getInstance(16384, Comparator.naturalOrder()); - for (ItemsSketch s : sketchList) { - union.update(s); - } - ItemsSketch sketch = union.getResult(); - - Integer min = sketch.getMinValue(); - Integer median = sketch.getQuantile(0.5D); - Integer max = sketch.getMaxValue(); - return new ImmutableTriple<>(min, median, max); - } - - private Triple getMinMedianMaxLongKey(int dimension) throws IOException { - String keyField = schema.getRowKeyFields().get(dimension).getName(); - - // Read all sketches - List> sketchList = new ArrayList<>(); - for (String fileName : fileNames) { - String sketchesFile = fileName.replace(".parquet", ".sketches"); - LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); - sketchList.add(sketches.getQuantilesSketch(keyField)); - } - - // Union all the sketches - ItemsUnion union = ItemsUnion.getInstance(16384, Comparator.naturalOrder()); - for (ItemsSketch s : sketchList) { - union.update(s); - } - ItemsSketch sketch = union.getResult(); - - Long min = sketch.getMinValue(); - Long median = sketch.getQuantile(0.5D); - Long max = sketch.getMaxValue(); - return new ImmutableTriple<>(min, median, max); - } - - private Triple getMinMedianMaxStringKey(int dimension) throws IOException { - String keyField = schema.getRowKeyFields().get(dimension).getName(); - - // Read all sketches - List> sketchList = new ArrayList<>(); - for (String fileName : fileNames) { - String sketchesFile = fileName.replace(".parquet", ".sketches"); - LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); - sketchList.add(sketches.getQuantilesSketch(keyField)); - } - - // Union all the sketches - ItemsUnion union = ItemsUnion.getInstance(16384, Comparator.naturalOrder()); - for (ItemsSketch s : sketchList) { - union.update(s); - } - ItemsSketch sketch = union.getResult(); - - String min = sketch.getMinValue(); - String median = sketch.getQuantile(0.5D); - String max = sketch.getMaxValue(); - return new ImmutableTriple<>(min, median, max); - } - - private Triple getMinMedianMaxByteArrayKey(int dimension) throws IOException { - String keyField = schema.getRowKeyFields().get(dimension).getName(); - - // Read all sketches - List> sketchList = new ArrayList<>(); - for (String fileName : fileNames) { - String sketchesFile = fileName.replace(".parquet", ".sketches"); - LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); - sketchList.add(sketches.getQuantilesSketch(keyField)); - } - - // Union all the sketches - ItemsUnion union = ItemsUnion.getInstance(16384, Comparator.naturalOrder()); - for (ItemsSketch s : sketchList) { - union.update(s); - } - ItemsSketch sketch = union.getResult(); - - ByteArray min = sketch.getMinValue(); - ByteArray median = sketch.getQuantile(0.5D); - ByteArray max = sketch.getMaxValue(); - return new ImmutableTriple<>(min, median, max); - } - - private List removeRange(List inputRanges, String rangeToRemove) { - return inputRanges.stream() - .filter(r -> !r.getFieldName().equals(rangeToRemove)) - .collect(Collectors.toList()); - } - - private void splitPartition(Partition partition, Object splitPoint, int dimension) throws StateStoreException { + public void splitPartition(Partition partition, Object splitPoint, int dimension) throws StateStoreException { Field fieldToSplitOn = schema.getRowKeyFields().get(dimension); LOGGER.info("Splitting partition {} on split point {} in dimension {}", partition.getId(), splitPoint, dimension); @@ -298,6 +116,12 @@ private void splitPartition(Partition partition, Object splitPoint, int dimensio stateStore.atomicallyUpdatePartitionAndCreateNewOnes(partition, leftChild, rightChild); } + private List removeRange(List inputRanges, String rangeToRemove) { + return inputRanges.stream() + .filter(r -> !r.getFieldName().equals(rangeToRemove)) + .collect(Collectors.toList()); + } + public interface SketchesLoader { Sketches load(String filename) throws IOException; } diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java index d591dbdcb0..68ccc2fa86 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; @@ -62,7 +63,14 @@ public SplitPartition(StateStore stateStore, } public void splitPartition(Partition partition, List fileNames) throws StateStoreException, IOException { - new SplitMultiDimensionalPartitionImpl(stateStore, schema, partition, fileNames, idSupplier, sketchesLoader) - .splitPartition(); + SplitMultiDimensionalPartitionImpl impl = new SplitMultiDimensionalPartitionImpl(stateStore, schema, idSupplier); + FindPartitionSplitPoint findSplitPoint = new FindPartitionSplitPoint(schema, fileNames, sketchesLoader::load); + for (int dimension = 0; dimension < schema.getRowKeyFields().size(); dimension++) { + Optional splitPointOpt = findSplitPoint.splitPointForDimension(dimension); + if (splitPointOpt.isPresent()) { + impl.splitPartition(partition, splitPointOpt.get(), dimension); + return; + } + } } } From 698f34351e4728cfc294ab8f0647a35abd955e64 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:59:34 +0000 Subject: [PATCH 09/16] Move state store update out of partition split factory and rename --- .../splitter/FindPartitionSplitPoint.java | 10 +++++ .../java/sleeper/splitter/SplitPartition.java | 21 ++++++--- .../splitter/SplitPartitionResult.java | 43 +++++++++++++++++++ ....java => SplitPartitionResultFactory.java} | 26 +++-------- .../sleeper/splitter/SplitPartitionTest.java | 2 +- 5 files changed, 75 insertions(+), 27 deletions(-) create mode 100644 java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResult.java rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{SplitMultiDimensionalPartitionImpl.java => SplitPartitionResultFactory.java} (82%) diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java index ed442394a3..e9a3a7f2a1 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java @@ -20,6 +20,8 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.datasketches.quantiles.ItemsSketch; import org.apache.datasketches.quantiles.ItemsUnion; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +32,7 @@ import sleeper.core.schema.type.PrimitiveType; import sleeper.core.schema.type.StringType; import sleeper.sketches.Sketches; +import sleeper.sketches.s3.SketchesSerDeToS3; import java.io.IOException; import java.util.ArrayList; @@ -38,6 +41,9 @@ import java.util.Optional; import java.util.function.Function; +/** + * Finds a split point for a partition by examining the sketches for each file. + */ public class FindPartitionSplitPoint { public static final Logger LOGGER = LoggerFactory.getLogger(FindPartitionSplitPoint.class); @@ -198,4 +204,8 @@ public interface SketchesLoader { Sketches load(String filename) throws IOException; } + public static SketchesLoader loadSketchesFromFile(Schema schema, Configuration conf) { + return (filename) -> new SketchesSerDeToS3(schema).loadFromHadoopFS(new Path(filename), conf); + } + } diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java index 68ccc2fa86..a06f729528 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java @@ -21,7 +21,7 @@ import sleeper.core.schema.Schema; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; -import sleeper.splitter.SplitMultiDimensionalPartitionImpl.SketchesLoader; +import sleeper.splitter.FindPartitionSplitPoint.SketchesLoader; import java.io.IOException; import java.util.List; @@ -29,10 +29,20 @@ import java.util.UUID; import java.util.function.Supplier; -import static sleeper.splitter.SplitMultiDimensionalPartitionImpl.loadSketchesFromFile; +import static sleeper.splitter.FindPartitionSplitPoint.loadSketchesFromFile; /** - * Splits a partition. Delegates to {@link SplitMultiDimensionalPartitionImpl}. + * Splits a partition. Identifies the median value of the first dimension. If that leads to a valid split (i.e. one + * where it is not equal to the minimum value and not equal to the maximum value) then that is used to split the + * partition. If it doesn't lead to a valid split then the above is repeated for the second dimension. This continues + * until either a valid split is found or no split is possible. + *

+ * Note that there are two situations in which a partition cannot be split: + * - If the partition consists of a single point (i.e. the minimum equals the maximum). + * - If the median equals the minimum then the partition cannot be split. + * This is because it would have to be split into [min, median) and [median, max), but if the min equals the median then + * the left one can't have any data in it as a key x in it would have to have min <= x < median = min which is a + * contradiction. */ public class SplitPartition { private final StateStore stateStore; @@ -63,12 +73,13 @@ public SplitPartition(StateStore stateStore, } public void splitPartition(Partition partition, List fileNames) throws StateStoreException, IOException { - SplitMultiDimensionalPartitionImpl impl = new SplitMultiDimensionalPartitionImpl(stateStore, schema, idSupplier); FindPartitionSplitPoint findSplitPoint = new FindPartitionSplitPoint(schema, fileNames, sketchesLoader::load); for (int dimension = 0; dimension < schema.getRowKeyFields().size(); dimension++) { Optional splitPointOpt = findSplitPoint.splitPointForDimension(dimension); if (splitPointOpt.isPresent()) { - impl.splitPartition(partition, splitPointOpt.get(), dimension); + SplitPartitionResult result = new SplitPartitionResultFactory(schema, idSupplier) + .splitPartition(partition, splitPointOpt.get(), dimension); + stateStore.atomicallyUpdatePartitionAndCreateNewOnes(result.getParentPartition(), result.getLeftChild(), result.getRightChild()); return; } } diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResult.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResult.java new file mode 100644 index 0000000000..12f0e29b7a --- /dev/null +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResult.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.splitter; + +import sleeper.core.partition.Partition; + +public class SplitPartitionResult { + + private final Partition parentPartition; + private final Partition leftChild; + private final Partition rightChild; + + public SplitPartitionResult(Partition parentPartition, Partition leftChild, Partition rightChild) { + this.parentPartition = parentPartition; + this.leftChild = leftChild; + this.rightChild = rightChild; + } + + public Partition getParentPartition() { + return parentPartition; + } + + public Partition getLeftChild() { + return leftChild; + } + + public Partition getRightChild() { + return rightChild; + } +} diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResultFactory.java similarity index 82% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResultFactory.java index e7e921f7f7..41603bf070 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResultFactory.java @@ -15,8 +15,6 @@ */ package sleeper.splitter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,12 +24,8 @@ import sleeper.core.range.Region; import sleeper.core.schema.Field; import sleeper.core.schema.Schema; -import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; -import sleeper.sketches.Sketches; -import sleeper.sketches.s3.SketchesSerDeToS3; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -55,22 +49,20 @@ * contradiction. *

*/ -public class SplitMultiDimensionalPartitionImpl { - private static final Logger LOGGER = LoggerFactory.getLogger(SplitMultiDimensionalPartitionImpl.class); +public class SplitPartitionResultFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SplitPartitionResultFactory.class); - private final StateStore stateStore; private final Schema schema; private final RangeFactory rangeFactory; private final Supplier idSupplier; - public SplitMultiDimensionalPartitionImpl(StateStore stateStore, Schema schema, Supplier idSupplier) { - this.stateStore = stateStore; + public SplitPartitionResultFactory(Schema schema, Supplier idSupplier) { this.schema = schema; this.rangeFactory = new RangeFactory(schema); this.idSupplier = idSupplier; } - public void splitPartition(Partition partition, Object splitPoint, int dimension) throws StateStoreException { + public SplitPartitionResult splitPartition(Partition partition, Object splitPoint, int dimension) throws StateStoreException { Field fieldToSplitOn = schema.getRowKeyFields().get(dimension); LOGGER.info("Splitting partition {} on split point {} in dimension {}", partition.getId(), splitPoint, dimension); @@ -113,7 +105,7 @@ public void splitPartition(Partition partition, Object splitPoint, int dimension LOGGER.info("New partition: {}", leftChild); LOGGER.info("New partition: {}", rightChild); - stateStore.atomicallyUpdatePartitionAndCreateNewOnes(partition, leftChild, rightChild); + return new SplitPartitionResult(partition, leftChild, rightChild); } private List removeRange(List inputRanges, String rangeToRemove) { @@ -121,12 +113,4 @@ private List removeRange(List inputRanges, String rangeToRemove) { .filter(r -> !r.getFieldName().equals(rangeToRemove)) .collect(Collectors.toList()); } - - public interface SketchesLoader { - Sketches load(String filename) throws IOException; - } - - public static SketchesLoader loadSketchesFromFile(Schema schema, Configuration conf) { - return (filename) -> new SketchesSerDeToS3(schema).loadFromHadoopFS(new Path(filename), conf); - } } diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java index 3c882d9ccf..75000cf87f 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java @@ -33,7 +33,7 @@ import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStore; import sleeper.sketches.Sketches; -import sleeper.splitter.SplitMultiDimensionalPartitionImpl.SketchesLoader; +import sleeper.splitter.FindPartitionSplitPoint.SketchesLoader; import java.util.ArrayList; import java.util.Arrays; From ce02735ce113a6747e2e5f29313a31edcb4bbabd Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 15:03:30 +0000 Subject: [PATCH 10/16] Split splitter package into two --- .../athena/metadata/IteratorApplyingMetadataHandlerIT.java | 2 +- .../sleeper/athena/metadata/SleeperMetadataHandlerIT.java | 2 +- .../clients/status/report/DeadLettersStatusReport.java | 2 +- .../clients/status/report/partitions/PartitionStatus.java | 4 ++-- .../splitter/{ => find}/FindPartitionToSplitResult.java | 2 +- .../sleeper/splitter/{ => find}/FindPartitionsToSplit.java | 2 +- .../sleeper/splitter/{ => find}/PartitionSplitCheck.java | 2 +- .../splitter/{ => find}/SplitPartitionJobDefinition.java | 2 +- .../{ => find}/SplitPartitionJobDefinitionSerDe.java | 2 +- .../splitter/{ => split}/FindPartitionSplitPoint.java | 2 +- .../java/sleeper/splitter/{ => split}/SplitPartition.java | 6 +++--- .../sleeper/splitter/{ => split}/SplitPartitionResult.java | 2 +- .../splitter/{ => split}/SplitPartitionResultFactory.java | 2 +- .../splitter/{ => find}/FindPartitionsToSplitTest.java | 2 +- .../{ => find}/SplitPartitionJobDefinitionSerDeTest.java | 2 +- .../sleeper/splitter/{ => split}/SplitPartitionTest.java | 4 ++-- .../splitter/lambda/FindPartitionsToSplitLambda.java | 2 +- .../java/sleeper/splitter/lambda/SplitPartitionLambda.java | 6 +++--- .../sleeper/splitter/lambda/SqsSplitPartitionJobSender.java | 4 ++-- .../sleeper/splitter/lambda/FindPartitionsToSplitIT.java | 6 +++--- .../dsl/partitioning/WaitForPartitionSplitting.java | 4 ++-- .../testutil/drivers/InMemoryPartitionSplittingDriver.java | 6 +++--- 22 files changed, 34 insertions(+), 34 deletions(-) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => find}/FindPartitionToSplitResult.java (97%) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => find}/FindPartitionsToSplit.java (99%) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => find}/PartitionSplitCheck.java (98%) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => find}/SplitPartitionJobDefinition.java (98%) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => find}/SplitPartitionJobDefinitionSerDe.java (99%) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => split}/FindPartitionSplitPoint.java (99%) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => split}/SplitPartition.java (95%) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => split}/SplitPartitionResult.java (97%) rename java/splitter/splitter-core/src/main/java/sleeper/splitter/{ => split}/SplitPartitionResultFactory.java (99%) rename java/splitter/splitter-core/src/test/java/sleeper/splitter/{ => find}/FindPartitionsToSplitTest.java (99%) rename java/splitter/splitter-core/src/test/java/sleeper/splitter/{ => find}/SplitPartitionJobDefinitionSerDeTest.java (99%) rename java/splitter/splitter-core/src/test/java/sleeper/splitter/{ => split}/SplitPartitionTest.java (99%) diff --git a/java/athena/src/test/java/sleeper/athena/metadata/IteratorApplyingMetadataHandlerIT.java b/java/athena/src/test/java/sleeper/athena/metadata/IteratorApplyingMetadataHandlerIT.java index 63b8dacc76..53a27c6b6e 100644 --- a/java/athena/src/test/java/sleeper/athena/metadata/IteratorApplyingMetadataHandlerIT.java +++ b/java/athena/src/test/java/sleeper/athena/metadata/IteratorApplyingMetadataHandlerIT.java @@ -49,7 +49,7 @@ import sleeper.core.partition.Partition; import sleeper.core.schema.Field; import sleeper.core.statestore.StateStore; -import sleeper.splitter.SplitPartition; +import sleeper.splitter.split.SplitPartition; import sleeper.statestore.StateStoreFactory; import java.util.ArrayList; diff --git a/java/athena/src/test/java/sleeper/athena/metadata/SleeperMetadataHandlerIT.java b/java/athena/src/test/java/sleeper/athena/metadata/SleeperMetadataHandlerIT.java index 2beb36d0bb..6d3dce4e73 100644 --- a/java/athena/src/test/java/sleeper/athena/metadata/SleeperMetadataHandlerIT.java +++ b/java/athena/src/test/java/sleeper/athena/metadata/SleeperMetadataHandlerIT.java @@ -52,7 +52,7 @@ import sleeper.configuration.properties.table.TableProperties; import sleeper.core.partition.Partition; import sleeper.core.statestore.StateStore; -import sleeper.splitter.SplitPartition; +import sleeper.splitter.split.SplitPartition; import sleeper.statestore.StateStoreFactory; import java.io.IOException; diff --git a/java/clients/src/main/java/sleeper/clients/status/report/DeadLettersStatusReport.java b/java/clients/src/main/java/sleeper/clients/status/report/DeadLettersStatusReport.java index c3569ac7b0..b26ceaa3a0 100644 --- a/java/clients/src/main/java/sleeper/clients/status/report/DeadLettersStatusReport.java +++ b/java/clients/src/main/java/sleeper/clients/status/report/DeadLettersStatusReport.java @@ -30,7 +30,7 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; import sleeper.query.model.QuerySerDe; -import sleeper.splitter.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; import sleeper.task.common.QueueMessageCount; import java.io.IOException; diff --git a/java/clients/src/main/java/sleeper/clients/status/report/partitions/PartitionStatus.java b/java/clients/src/main/java/sleeper/clients/status/report/partitions/PartitionStatus.java index 72735cfacb..2cab530ad3 100644 --- a/java/clients/src/main/java/sleeper/clients/status/report/partitions/PartitionStatus.java +++ b/java/clients/src/main/java/sleeper/clients/status/report/partitions/PartitionStatus.java @@ -22,8 +22,8 @@ import sleeper.core.schema.Field; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; -import sleeper.splitter.FindPartitionsToSplit; -import sleeper.splitter.PartitionSplitCheck; +import sleeper.splitter.find.FindPartitionsToSplit; +import sleeper.splitter.find.PartitionSplitCheck; import java.util.List; import java.util.Set; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionToSplitResult.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionToSplitResult.java similarity index 97% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionToSplitResult.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionToSplitResult.java index 6b7af79eef..463ec20bb6 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionToSplitResult.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionToSplitResult.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import sleeper.core.partition.Partition; import sleeper.core.statestore.FileReference; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionsToSplit.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionsToSplit.java similarity index 99% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionsToSplit.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionsToSplit.java index 9d208d0a1c..c3f911483e 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionsToSplit.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionsToSplit.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/PartitionSplitCheck.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/PartitionSplitCheck.java similarity index 98% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/PartitionSplitCheck.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/PartitionSplitCheck.java index d34a1b98d6..b5ffc284ab 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/PartitionSplitCheck.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/PartitionSplitCheck.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import sleeper.configuration.properties.table.TableProperties; import sleeper.core.statestore.FileReference; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinition.java similarity index 98% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinition.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinition.java index ae82d1c3e1..6344a2a568 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinition.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import sleeper.core.partition.Partition; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinitionSerDe.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDe.java similarity index 99% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinitionSerDe.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDe.java index df0ef73244..1e8fc95f52 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinitionSerDe.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDe.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import com.google.gson.Gson; import com.google.gson.GsonBuilder; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java similarity index 99% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java index e9a3a7f2a1..4496d78d31 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionSplitPoint.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.split; import com.facebook.collections.ByteArray; import org.apache.commons.lang3.tuple.ImmutableTriple; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java similarity index 95% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java index a06f729528..56898c991c 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.split; import org.apache.hadoop.conf.Configuration; @@ -21,7 +21,7 @@ import sleeper.core.schema.Schema; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; -import sleeper.splitter.FindPartitionSplitPoint.SketchesLoader; +import sleeper.splitter.split.FindPartitionSplitPoint.SketchesLoader; import java.io.IOException; import java.util.List; @@ -29,7 +29,7 @@ import java.util.UUID; import java.util.function.Supplier; -import static sleeper.splitter.FindPartitionSplitPoint.loadSketchesFromFile; +import static sleeper.splitter.split.FindPartitionSplitPoint.loadSketchesFromFile; /** * Splits a partition. Identifies the median value of the first dimension. If that leads to a valid split (i.e. one diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResult.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResult.java similarity index 97% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResult.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResult.java index 12f0e29b7a..3f31466f86 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResult.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResult.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.split; import sleeper.core.partition.Partition; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResultFactory.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java similarity index 99% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResultFactory.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java index 41603bf070..de86c5ce12 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionResultFactory.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.split; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/FindPartitionsToSplitTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/find/FindPartitionsToSplitTest.java similarity index 99% rename from java/splitter/splitter-core/src/test/java/sleeper/splitter/FindPartitionsToSplitTest.java rename to java/splitter/splitter-core/src/test/java/sleeper/splitter/find/FindPartitionsToSplitTest.java index 4d119cb8bf..c299d631de 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/FindPartitionsToSplitTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/find/FindPartitionsToSplitTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionJobDefinitionSerDeTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDeTest.java similarity index 99% rename from java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionJobDefinitionSerDeTest.java rename to java/splitter/splitter-core/src/test/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDeTest.java index 6a4e2b51d2..525957341f 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionJobDefinitionSerDeTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDeTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import org.junit.jupiter.api.Test; diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/split/SplitPartitionTest.java similarity index 99% rename from java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java rename to java/splitter/splitter-core/src/test/java/sleeper/splitter/split/SplitPartitionTest.java index 75000cf87f..c4057a78f5 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/split/SplitPartitionTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.split; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -33,7 +33,7 @@ import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStore; import sleeper.sketches.Sketches; -import sleeper.splitter.FindPartitionSplitPoint.SketchesLoader; +import sleeper.splitter.split.FindPartitionSplitPoint.SketchesLoader; import java.util.ArrayList; import java.util.Arrays; diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitLambda.java index d598f9108c..fe1ce7a674 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitLambda.java @@ -37,7 +37,7 @@ import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; import sleeper.io.parquet.utils.HadoopConfigurationProvider; -import sleeper.splitter.FindPartitionsToSplit; +import sleeper.splitter.find.FindPartitionsToSplit; import sleeper.statestore.StateStoreProvider; import java.time.Instant; diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java index b83220b4d8..8ebb0b0f09 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java @@ -33,9 +33,9 @@ import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.io.parquet.utils.HadoopConfigurationProvider; -import sleeper.splitter.SplitPartition; -import sleeper.splitter.SplitPartitionJobDefinition; -import sleeper.splitter.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.find.SplitPartitionJobDefinition; +import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.split.SplitPartition; import sleeper.statestore.StateStoreProvider; import java.io.IOException; diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SqsSplitPartitionJobSender.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SqsSplitPartitionJobSender.java index 49ac998e22..9bdf27e5ae 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SqsSplitPartitionJobSender.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SqsSplitPartitionJobSender.java @@ -23,8 +23,8 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; -import sleeper.splitter.SplitPartitionJobDefinition; -import sleeper.splitter.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.find.SplitPartitionJobDefinition; +import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_SPLITTING_JOB_QUEUE_URL; diff --git a/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/FindPartitionsToSplitIT.java b/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/FindPartitionsToSplitIT.java index 877ed9e9cc..4666a918c7 100644 --- a/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/FindPartitionsToSplitIT.java +++ b/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/FindPartitionsToSplitIT.java @@ -45,9 +45,9 @@ import sleeper.ingest.impl.ParquetConfiguration; import sleeper.ingest.impl.partitionfilewriter.DirectPartitionFileWriterFactory; import sleeper.ingest.impl.recordbatch.arraylist.ArrayListRecordBatchFactory; -import sleeper.splitter.FindPartitionsToSplit; -import sleeper.splitter.SplitPartitionJobDefinition; -import sleeper.splitter.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.find.FindPartitionsToSplit; +import sleeper.splitter.find.SplitPartitionJobDefinition; +import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; import sleeper.statestore.FixedStateStoreProvider; import java.io.File; diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplitting.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplitting.java index 531495eac2..1b954d3ae7 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplitting.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplitting.java @@ -24,8 +24,8 @@ import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.util.PollWithRetries; -import sleeper.splitter.FindPartitionToSplitResult; -import sleeper.splitter.FindPartitionsToSplit; +import sleeper.splitter.find.FindPartitionToSplitResult; +import sleeper.splitter.find.FindPartitionsToSplit; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import java.time.Duration; diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java index eb970e8577..f996cfc160 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java @@ -21,9 +21,9 @@ import sleeper.configuration.properties.table.TableProperties; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; -import sleeper.splitter.FindPartitionsToSplit; -import sleeper.splitter.FindPartitionsToSplit.JobSender; -import sleeper.splitter.SplitPartition; +import sleeper.splitter.find.FindPartitionsToSplit; +import sleeper.splitter.find.FindPartitionsToSplit.JobSender; +import sleeper.splitter.split.SplitPartition; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import sleeper.systemtest.dsl.partitioning.PartitionSplittingDriver; From 3681da2a76e59292f15a3cbe238acae8ee7e53ae Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 15:08:31 +0000 Subject: [PATCH 11/16] Move logs for updating state store into SplitPartition --- .../splitter/split/SplitPartition.java | 22 ++++++++++++++++++- .../split/SplitPartitionResultFactory.java | 6 ----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java index 56898c991c..48c367ecd1 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java @@ -16,8 +16,11 @@ package sleeper.splitter.split; import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import sleeper.core.partition.Partition; +import sleeper.core.schema.Field; import sleeper.core.schema.Schema; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; @@ -45,6 +48,8 @@ * contradiction. */ public class SplitPartition { + public static final Logger LOGGER = LoggerFactory.getLogger(SplitPartition.class); + private final StateStore stateStore; private final Schema schema; private final SketchesLoader sketchesLoader; @@ -79,9 +84,24 @@ public void splitPartition(Partition partition, List fileNames) throws S if (splitPointOpt.isPresent()) { SplitPartitionResult result = new SplitPartitionResultFactory(schema, idSupplier) .splitPartition(partition, splitPointOpt.get(), dimension); - stateStore.atomicallyUpdatePartitionAndCreateNewOnes(result.getParentPartition(), result.getLeftChild(), result.getRightChild()); + apply(result); return; } } } + + private void apply(SplitPartitionResult result) throws StateStoreException { + + Partition parentPartition = result.getParentPartition(); + Partition leftChild = result.getLeftChild(); + Partition rightChild = result.getRightChild(); + Field splitField = schema.getRowKeyFields().get(parentPartition.getDimension()); + LOGGER.info("Updating StateStore:"); + LOGGER.info("Split partition ({}) is marked as not a leaf partition, split on field {}", + parentPartition.getId(), splitField.getName()); + LOGGER.info("New partition: {}", leftChild); + LOGGER.info("New partition: {}", rightChild); + + stateStore.atomicallyUpdatePartitionAndCreateNewOnes(parentPartition, leftChild, rightChild); + } } diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java index de86c5ce12..eb25af9f0d 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java @@ -99,12 +99,6 @@ public SplitPartitionResult splitPartition(Partition partition, Object splitPoin .childPartitionIds(Arrays.asList(leftChild.getId(), rightChild.getId())) .dimension(dimension).build(); - LOGGER.info("Updating StateStore:"); - LOGGER.info("Split partition ({}) is marked as not a leaf partition, split on field {}", - partition.getId(), fieldToSplitOn.getName()); - LOGGER.info("New partition: {}", leftChild); - LOGGER.info("New partition: {}", rightChild); - return new SplitPartitionResult(partition, leftChild, rightChild); } From 46d7fd6c745c7f9515acbd9974678e79e547c1cb Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 8 Aug 2024 15:21:58 +0000 Subject: [PATCH 12/16] Stream through dimensions in SplitPartition --- .../split/FindPartitionSplitPoint.java | 27 +++++++++----- .../splitter/split/SplitPartition.java | 37 ++++++++++++------- .../split/SplitPartitionResultFactory.java | 3 +- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java index 4496d78d31..2576a5d308 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java @@ -35,6 +35,7 @@ import sleeper.sketches.s3.SketchesSerDeToS3; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -60,7 +61,7 @@ public FindPartitionSplitPoint(Schema schema, List fileNames, SketchesLo this.sketchesLoader = sketchesLoader; } - public Optional splitPointForDimension(int dimension) throws IOException { + public Optional splitPointForDimension(int dimension) { PrimitiveType rowKeyType = rowKeyTypes.get(dimension); LOGGER.info("Testing field {} of type {} (dimension {}) to see if it can be split", schema.getRowKeyFieldNames().get(dimension), rowKeyType, dimension); @@ -100,7 +101,7 @@ private > Optional splitPointForDimension( } } - private Triple getMinMedianMaxIntKey(int dimension) throws IOException { + private Triple getMinMedianMaxIntKey(int dimension) { String keyField = schema.getRowKeyFields().get(dimension).getName(); // Read all sketches @@ -108,7 +109,7 @@ private Triple getMinMedianMaxIntKey(int dimension) t for (String fileName : fileNames) { String sketchesFile = fileName.replace(".parquet", ".sketches"); LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); + Sketches sketches = loadSketches(sketchesFile); sketchList.add(sketches.getQuantilesSketch(keyField)); } @@ -125,7 +126,7 @@ private Triple getMinMedianMaxIntKey(int dimension) t return new ImmutableTriple<>(min, median, max); } - private Triple getMinMedianMaxLongKey(int dimension) throws IOException { + private Triple getMinMedianMaxLongKey(int dimension) { String keyField = schema.getRowKeyFields().get(dimension).getName(); // Read all sketches @@ -133,7 +134,7 @@ private Triple getMinMedianMaxLongKey(int dimension) throws IO for (String fileName : fileNames) { String sketchesFile = fileName.replace(".parquet", ".sketches"); LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); + Sketches sketches = loadSketches(sketchesFile); sketchList.add(sketches.getQuantilesSketch(keyField)); } @@ -150,7 +151,7 @@ private Triple getMinMedianMaxLongKey(int dimension) throws IO return new ImmutableTriple<>(min, median, max); } - private Triple getMinMedianMaxStringKey(int dimension) throws IOException { + private Triple getMinMedianMaxStringKey(int dimension) { String keyField = schema.getRowKeyFields().get(dimension).getName(); // Read all sketches @@ -158,7 +159,7 @@ private Triple getMinMedianMaxStringKey(int dimension) t for (String fileName : fileNames) { String sketchesFile = fileName.replace(".parquet", ".sketches"); LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); + Sketches sketches = loadSketches(sketchesFile); sketchList.add(sketches.getQuantilesSketch(keyField)); } @@ -175,7 +176,7 @@ private Triple getMinMedianMaxStringKey(int dimension) t return new ImmutableTriple<>(min, median, max); } - private Triple getMinMedianMaxByteArrayKey(int dimension) throws IOException { + private Triple getMinMedianMaxByteArrayKey(int dimension) { String keyField = schema.getRowKeyFields().get(dimension).getName(); // Read all sketches @@ -183,7 +184,7 @@ private Triple getMinMedianMaxByteArrayKey(int for (String fileName : fileNames) { String sketchesFile = fileName.replace(".parquet", ".sketches"); LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); + Sketches sketches = loadSketches(sketchesFile); sketchList.add(sketches.getQuantilesSketch(keyField)); } @@ -200,6 +201,14 @@ private Triple getMinMedianMaxByteArrayKey(int return new ImmutableTriple<>(min, median, max); } + private Sketches loadSketches(String filename) { + try { + return sketchesLoader.load(filename); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public interface SketchesLoader { Sketches load(String filename) throws IOException; } diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java index 48c367ecd1..c317d45a0c 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java @@ -26,11 +26,11 @@ import sleeper.core.statestore.StateStoreException; import sleeper.splitter.split.FindPartitionSplitPoint.SketchesLoader; -import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; +import java.util.stream.IntStream; import static sleeper.splitter.split.FindPartitionSplitPoint.loadSketchesFromFile; @@ -77,20 +77,25 @@ public SplitPartition(StateStore stateStore, this.idSupplier = idSupplier; } - public void splitPartition(Partition partition, List fileNames) throws StateStoreException, IOException { - FindPartitionSplitPoint findSplitPoint = new FindPartitionSplitPoint(schema, fileNames, sketchesLoader::load); - for (int dimension = 0; dimension < schema.getRowKeyFields().size(); dimension++) { - Optional splitPointOpt = findSplitPoint.splitPointForDimension(dimension); - if (splitPointOpt.isPresent()) { - SplitPartitionResult result = new SplitPartitionResultFactory(schema, idSupplier) - .splitPartition(partition, splitPointOpt.get(), dimension); - apply(result); - return; - } - } + public void splitPartition(Partition partition, List fileNames) { + getResultIfSplittable(partition, fileNames) + .ifPresent(this::apply); + } + + private Optional getResultIfSplittable(Partition partition, List fileNames) { + FindPartitionSplitPoint findSplitPoint = new FindPartitionSplitPoint(schema, fileNames, sketchesLoader); + return IntStream.range(0, schema.getRowKeyFields().size()) + .mapToObj(dimension -> findSplitPoint.splitPointForDimension(dimension) + .map(splitPoint -> resultFactory().splitPartition(partition, splitPoint, dimension))) + .flatMap(Optional::stream) + .findFirst(); } - private void apply(SplitPartitionResult result) throws StateStoreException { + private SplitPartitionResultFactory resultFactory() { + return new SplitPartitionResultFactory(schema, idSupplier); + } + + private void apply(SplitPartitionResult result) { Partition parentPartition = result.getParentPartition(); Partition leftChild = result.getLeftChild(); @@ -102,6 +107,10 @@ private void apply(SplitPartitionResult result) throws StateStoreException { LOGGER.info("New partition: {}", leftChild); LOGGER.info("New partition: {}", rightChild); - stateStore.atomicallyUpdatePartitionAndCreateNewOnes(parentPartition, leftChild, rightChild); + try { + stateStore.atomicallyUpdatePartitionAndCreateNewOnes(parentPartition, leftChild, rightChild); + } catch (StateStoreException e) { + throw new RuntimeException(e); + } } } diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java index eb25af9f0d..73983aa26a 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java @@ -24,7 +24,6 @@ import sleeper.core.range.Region; import sleeper.core.schema.Field; import sleeper.core.schema.Schema; -import sleeper.core.statestore.StateStoreException; import java.util.ArrayList; import java.util.Arrays; @@ -62,7 +61,7 @@ public SplitPartitionResultFactory(Schema schema, Supplier idSupplier) { this.idSupplier = idSupplier; } - public SplitPartitionResult splitPartition(Partition partition, Object splitPoint, int dimension) throws StateStoreException { + public SplitPartitionResult splitPartition(Partition partition, Object splitPoint, int dimension) { Field fieldToSplitOn = schema.getRowKeyFields().get(dimension); LOGGER.info("Splitting partition {} on split point {} in dimension {}", partition.getId(), splitPoint, dimension); From 5a6a352f8daa916588f4a942adc2ce55d8b8993b Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Fri, 9 Aug 2024 07:54:41 +0000 Subject: [PATCH 13/16] Stop catching exceptions in SplitPartitionLambda --- .../splitter/lambda/SplitPartitionLambda.java | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java index 8ebb0b0f09..74721734c3 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java @@ -31,15 +31,12 @@ import sleeper.configuration.properties.table.TableProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; import sleeper.core.statestore.StateStore; -import sleeper.core.statestore.StateStoreException; import sleeper.io.parquet.utils.HadoopConfigurationProvider; import sleeper.splitter.find.SplitPartitionJobDefinition; import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; import sleeper.splitter.split.SplitPartition; import sleeper.statestore.StateStoreProvider; -import java.io.IOException; - import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; /** @@ -72,19 +69,15 @@ public SplitPartitionLambda() { @Override public Void handleRequest(SQSEvent event, Context context) { propertiesReloader.reloadIfNeeded(); - try { - for (SQSEvent.SQSMessage message : event.getRecords()) { - String serialisedJob = message.getBody(); - SplitPartitionJobDefinition job = new SplitPartitionJobDefinitionSerDe(tablePropertiesProvider) - .fromJson(serialisedJob); - LOGGER.info("Received partition splitting job {}", job); - TableProperties tableProperties = tablePropertiesProvider.getById(job.getTableId()); - StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); - SplitPartition splitPartition = new SplitPartition(stateStore, tableProperties.getSchema(), conf); - splitPartition.splitPartition(job.getPartition(), job.getFileNames()); - } - } catch (IOException | StateStoreException ex) { - LOGGER.error("Exception handling partition splitting job", ex); + for (SQSEvent.SQSMessage message : event.getRecords()) { + String serialisedJob = message.getBody(); + SplitPartitionJobDefinition job = new SplitPartitionJobDefinitionSerDe(tablePropertiesProvider) + .fromJson(serialisedJob); + LOGGER.info("Received partition splitting job {}", job); + TableProperties tableProperties = tablePropertiesProvider.getById(job.getTableId()); + StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); + SplitPartition splitPartition = new SplitPartition(stateStore, tableProperties.getSchema(), conf); + splitPartition.splitPartition(job.getPartition(), job.getFileNames()); } return null; } From b22dc65899ed621a7411abc0a8210b9933f2419d Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Fri, 9 Aug 2024 07:57:36 +0000 Subject: [PATCH 14/16] Use SQSBatchResponse in SplitPartitionLambda --- .../splitter/lambda/SplitPartitionLambda.java | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java index 74721734c3..fb83ced882 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java @@ -19,6 +19,8 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse.BatchItemFailure; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; @@ -37,13 +39,15 @@ import sleeper.splitter.split.SplitPartition; import sleeper.statestore.StateStoreProvider; +import java.util.ArrayList; +import java.util.List; + import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; /** * Triggered by an SQS event containing a partition splitting job to do. */ -@SuppressWarnings("unused") -public class SplitPartitionLambda implements RequestHandler { +public class SplitPartitionLambda implements RequestHandler { private final PropertiesReloader propertiesReloader; private final Configuration conf; private static final Logger LOGGER = LoggerFactory.getLogger(SplitPartitionLambda.class); @@ -67,18 +71,22 @@ public SplitPartitionLambda() { } @Override - public Void handleRequest(SQSEvent event, Context context) { + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { propertiesReloader.reloadIfNeeded(); + List batchItemFailures = new ArrayList<>(); for (SQSEvent.SQSMessage message : event.getRecords()) { - String serialisedJob = message.getBody(); - SplitPartitionJobDefinition job = new SplitPartitionJobDefinitionSerDe(tablePropertiesProvider) - .fromJson(serialisedJob); - LOGGER.info("Received partition splitting job {}", job); - TableProperties tableProperties = tablePropertiesProvider.getById(job.getTableId()); - StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); - SplitPartition splitPartition = new SplitPartition(stateStore, tableProperties.getSchema(), conf); - splitPartition.splitPartition(job.getPartition(), job.getFileNames()); + try { + SplitPartitionJobDefinition job = new SplitPartitionJobDefinitionSerDe(tablePropertiesProvider) + .fromJson(message.getBody()); + LOGGER.info("Received partition splitting job {}", job); + TableProperties tableProperties = tablePropertiesProvider.getById(job.getTableId()); + StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); + SplitPartition splitPartition = new SplitPartition(stateStore, tableProperties.getSchema(), conf); + splitPartition.splitPartition(job.getPartition(), job.getFileNames()); + } catch (RuntimeException e) { + batchItemFailures.add(new BatchItemFailure(message.getMessageId())); + } } - return null; + return new SQSBatchResponse(batchItemFailures); } } From 211ff66c1e7f8372f6c576e1d437d2f288a834ba Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Fri, 9 Aug 2024 07:58:16 +0000 Subject: [PATCH 15/16] Log partition splitting failure --- .../main/java/sleeper/splitter/lambda/SplitPartitionLambda.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java index fb83ced882..dc70ab849e 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java @@ -84,6 +84,7 @@ public SQSBatchResponse handleRequest(SQSEvent event, Context context) { SplitPartition splitPartition = new SplitPartition(stateStore, tableProperties.getSchema(), conf); splitPartition.splitPartition(job.getPartition(), job.getFileNames()); } catch (RuntimeException e) { + LOGGER.error("Failed partition splitting", e); batchItemFailures.add(new BatchItemFailure(message.getMessageId())); } } From 4c873f7932d4ca859a753ddc676d4a9e3c7f3794 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Fri, 9 Aug 2024 08:10:28 +0000 Subject: [PATCH 16/16] Stop catching exceptions in InMemoryPartitionSplittingDriver --- .../drivers/InMemoryPartitionSplittingDriver.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java index f996cfc160..9613ff0520 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java @@ -27,8 +27,6 @@ import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import sleeper.systemtest.dsl.partitioning.PartitionSplittingDriver; -import java.io.IOException; - public class InMemoryPartitionSplittingDriver implements PartitionSplittingDriver { public static final Logger LOGGER = LoggerFactory.getLogger(InMemoryPartitionSplittingDriver.class); private final SystemTestInstanceContext instance; @@ -59,11 +57,7 @@ private JobSender splitPartition() { TableProperties tableProperties = instance.getTablePropertiesProvider().getById(job.getTableId()); StateStore stateStore = instance.getStateStoreProvider().getStateStore(tableProperties); SplitPartition splitPartition = new SplitPartition(stateStore, tableProperties.getSchema(), sketches::load); - try { - splitPartition.splitPartition(job.getPartition(), job.getFileNames()); - } catch (IOException | StateStoreException e) { - throw new RuntimeException("Failed to split partition", e); - } + splitPartition.splitPartition(job.getPartition(), job.getFileNames()); }; } }