diff --git a/java/athena/src/test/java/sleeper/athena/metadata/IteratorApplyingMetadataHandlerIT.java b/java/athena/src/test/java/sleeper/athena/metadata/IteratorApplyingMetadataHandlerIT.java index 63b8dacc76..53a27c6b6e 100644 --- a/java/athena/src/test/java/sleeper/athena/metadata/IteratorApplyingMetadataHandlerIT.java +++ b/java/athena/src/test/java/sleeper/athena/metadata/IteratorApplyingMetadataHandlerIT.java @@ -49,7 +49,7 @@ import sleeper.core.partition.Partition; import sleeper.core.schema.Field; import sleeper.core.statestore.StateStore; -import sleeper.splitter.SplitPartition; +import sleeper.splitter.split.SplitPartition; import sleeper.statestore.StateStoreFactory; import java.util.ArrayList; diff --git a/java/athena/src/test/java/sleeper/athena/metadata/SleeperMetadataHandlerIT.java b/java/athena/src/test/java/sleeper/athena/metadata/SleeperMetadataHandlerIT.java index 2beb36d0bb..6d3dce4e73 100644 --- a/java/athena/src/test/java/sleeper/athena/metadata/SleeperMetadataHandlerIT.java +++ b/java/athena/src/test/java/sleeper/athena/metadata/SleeperMetadataHandlerIT.java @@ -52,7 +52,7 @@ import sleeper.configuration.properties.table.TableProperties; import sleeper.core.partition.Partition; import sleeper.core.statestore.StateStore; -import sleeper.splitter.SplitPartition; +import sleeper.splitter.split.SplitPartition; import sleeper.statestore.StateStoreFactory; import java.io.IOException; diff --git a/java/clients/src/main/java/sleeper/clients/status/report/DeadLettersStatusReport.java b/java/clients/src/main/java/sleeper/clients/status/report/DeadLettersStatusReport.java index c3569ac7b0..b26ceaa3a0 100644 --- a/java/clients/src/main/java/sleeper/clients/status/report/DeadLettersStatusReport.java +++ b/java/clients/src/main/java/sleeper/clients/status/report/DeadLettersStatusReport.java @@ -30,7 +30,7 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; import sleeper.query.model.QuerySerDe; -import sleeper.splitter.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; import sleeper.task.common.QueueMessageCount; import java.io.IOException; diff --git a/java/clients/src/main/java/sleeper/clients/status/report/partitions/PartitionStatus.java b/java/clients/src/main/java/sleeper/clients/status/report/partitions/PartitionStatus.java index 72735cfacb..2cab530ad3 100644 --- a/java/clients/src/main/java/sleeper/clients/status/report/partitions/PartitionStatus.java +++ b/java/clients/src/main/java/sleeper/clients/status/report/partitions/PartitionStatus.java @@ -22,8 +22,8 @@ import sleeper.core.schema.Field; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; -import sleeper.splitter.FindPartitionsToSplit; -import sleeper.splitter.PartitionSplitCheck; +import sleeper.splitter.find.FindPartitionsToSplit; +import sleeper.splitter.find.PartitionSplitCheck; import java.util.List; import java.util.Set; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java deleted file mode 100644 index 5cf08b36c7..0000000000 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartition.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package sleeper.splitter; - -import org.apache.hadoop.conf.Configuration; - -import sleeper.core.partition.Partition; -import sleeper.core.schema.Schema; -import sleeper.core.statestore.StateStore; -import sleeper.core.statestore.StateStoreException; -import sleeper.splitter.SplitMultiDimensionalPartitionImpl.SketchesLoader; - -import java.io.IOException; -import java.util.List; -import java.util.UUID; -import java.util.function.Supplier; - -import static sleeper.splitter.SplitMultiDimensionalPartitionImpl.loadFromFile; - -/** - * Splits a partition. Delegates to {@link SplitMultiDimensionalPartitionImpl}. - */ -public class SplitPartition { - private final StateStore stateStore; - private final Schema schema; - private final SketchesLoader sketchesLoader; - private final Supplier idSupplier; - - public SplitPartition(StateStore stateStore, - Schema schema, - Configuration conf) { - this(stateStore, schema, loadFromFile(schema, conf)); - } - - public SplitPartition(StateStore stateStore, - Schema schema, - SketchesLoader sketchesLoader) { - this(stateStore, schema, sketchesLoader, () -> UUID.randomUUID().toString()); - } - - public SplitPartition(StateStore stateStore, - Schema schema, - SketchesLoader sketchesLoader, - Supplier idSupplier) { - this.stateStore = stateStore; - this.schema = schema; - this.sketchesLoader = sketchesLoader; - this.idSupplier = idSupplier; - } - - public void splitPartition(Partition partition, List fileNames) throws StateStoreException, IOException { - new SplitMultiDimensionalPartitionImpl(stateStore, schema, partition, fileNames, idSupplier, sketchesLoader) - .splitPartition(); - } -} diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionToSplitResult.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionToSplitResult.java similarity index 97% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionToSplitResult.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionToSplitResult.java index 6b7af79eef..463ec20bb6 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionToSplitResult.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionToSplitResult.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import sleeper.core.partition.Partition; import sleeper.core.statestore.FileReference; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionsToSplit.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionsToSplit.java similarity index 99% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionsToSplit.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionsToSplit.java index 9d208d0a1c..c3f911483e 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/FindPartitionsToSplit.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/FindPartitionsToSplit.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/PartitionSplitCheck.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/PartitionSplitCheck.java similarity index 98% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/PartitionSplitCheck.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/PartitionSplitCheck.java index d34a1b98d6..b5ffc284ab 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/PartitionSplitCheck.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/PartitionSplitCheck.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import sleeper.configuration.properties.table.TableProperties; import sleeper.core.statestore.FileReference; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinition.java similarity index 98% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinition.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinition.java index ae82d1c3e1..6344a2a568 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinition.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import sleeper.core.partition.Partition; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinitionSerDe.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDe.java similarity index 99% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinitionSerDe.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDe.java index df0ef73244..1e8fc95f52 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitPartitionJobDefinitionSerDe.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDe.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import com.google.gson.Gson; import com.google.gson.GsonBuilder; diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java similarity index 58% rename from java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java rename to java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java index b14f479ba5..2576a5d308 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/SplitMultiDimensionalPartitionImpl.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/FindPartitionSplitPoint.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.split; import com.facebook.collections.ByteArray; import org.apache.commons.lang3.tuple.ImmutableTriple; @@ -25,88 +25,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sleeper.core.partition.Partition; -import sleeper.core.range.Range; -import sleeper.core.range.Range.RangeFactory; -import sleeper.core.range.Region; -import sleeper.core.schema.Field; import sleeper.core.schema.Schema; import sleeper.core.schema.type.ByteArrayType; import sleeper.core.schema.type.IntType; import sleeper.core.schema.type.LongType; import sleeper.core.schema.type.PrimitiveType; import sleeper.core.schema.type.StringType; -import sleeper.core.statestore.StateStore; -import sleeper.core.statestore.StateStoreException; import sleeper.sketches.Sketches; import sleeper.sketches.s3.SketchesSerDeToS3; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; /** - * Identifies the median value of the first dimension. If that leads to a valid - * split (i.e. one where it is not equal to the minimum value and not equal to - * the maximum value) then that is used to split the partition. If it doesn't - * lead to a valid split then the above is repeated for the second dimension. - * This continues until either a valid split is found or no split is possible. - *

- * Note that there are two situations in which a partition cannot be split: - * - If the partition consists of a single point (i.e. the minimum - * equals the maximum). - * - If the median equals the minimum then the partition cannot be split. - * This is because it would have to be split into [min, median) and [median, max), - * but if the min equals the median then the left one can't have any data in it - * as a key x in it would have to have min <= x < median = min which is a - * contradiction. - *

+ * Finds a split point for a partition by examining the sketches for each file. */ -public class SplitMultiDimensionalPartitionImpl { - private static final Logger LOGGER = LoggerFactory.getLogger(SplitMultiDimensionalPartitionImpl.class); +public class FindPartitionSplitPoint { + + public static final Logger LOGGER = LoggerFactory.getLogger(FindPartitionSplitPoint.class); - private final StateStore stateStore; private final Schema schema; private final List rowKeyTypes; - private final Partition partition; - private final List fileNames; // These should be active files for the partition - private final RangeFactory rangeFactory; - private final Supplier idSupplier; + private final List fileNames; private final SketchesLoader sketchesLoader; - public SplitMultiDimensionalPartitionImpl(StateStore stateStore, - Schema schema, - Partition partition, - List fileNames, - Supplier idSupplier, - SketchesLoader sketchesLoader) { - this.stateStore = stateStore; + public FindPartitionSplitPoint(Schema schema, List fileNames, SketchesLoader sketchesLoader) { this.schema = schema; this.rowKeyTypes = schema.getRowKeyTypes(); - this.partition = partition; this.fileNames = fileNames; - this.rangeFactory = new RangeFactory(schema); - this.idSupplier = idSupplier; this.sketchesLoader = sketchesLoader; } - void splitPartition() throws StateStoreException, IOException { - for (int dimension = 0; dimension < rowKeyTypes.size(); dimension++) { - Optional splitPointOpt = splitPointForDimension(dimension); - if (splitPointOpt.isPresent()) { - splitPartition(partition, splitPointOpt.get(), dimension); - return; - } - } - } - - public Optional splitPointForDimension(int dimension) throws IOException { + public Optional splitPointForDimension(int dimension) { PrimitiveType rowKeyType = rowKeyTypes.get(dimension); LOGGER.info("Testing field {} of type {} (dimension {}) to see if it can be split", schema.getRowKeyFieldNames().get(dimension), rowKeyType, dimension); @@ -146,7 +101,7 @@ private > Optional splitPointForDimension( } } - private Triple getMinMedianMaxIntKey(int dimension) throws IOException { + private Triple getMinMedianMaxIntKey(int dimension) { String keyField = schema.getRowKeyFields().get(dimension).getName(); // Read all sketches @@ -154,7 +109,7 @@ private Triple getMinMedianMaxIntKey(int dimension) t for (String fileName : fileNames) { String sketchesFile = fileName.replace(".parquet", ".sketches"); LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); + Sketches sketches = loadSketches(sketchesFile); sketchList.add(sketches.getQuantilesSketch(keyField)); } @@ -171,7 +126,7 @@ private Triple getMinMedianMaxIntKey(int dimension) t return new ImmutableTriple<>(min, median, max); } - private Triple getMinMedianMaxLongKey(int dimension) throws IOException { + private Triple getMinMedianMaxLongKey(int dimension) { String keyField = schema.getRowKeyFields().get(dimension).getName(); // Read all sketches @@ -179,7 +134,7 @@ private Triple getMinMedianMaxLongKey(int dimension) throws IO for (String fileName : fileNames) { String sketchesFile = fileName.replace(".parquet", ".sketches"); LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); + Sketches sketches = loadSketches(sketchesFile); sketchList.add(sketches.getQuantilesSketch(keyField)); } @@ -196,7 +151,7 @@ private Triple getMinMedianMaxLongKey(int dimension) throws IO return new ImmutableTriple<>(min, median, max); } - private Triple getMinMedianMaxStringKey(int dimension) throws IOException { + private Triple getMinMedianMaxStringKey(int dimension) { String keyField = schema.getRowKeyFields().get(dimension).getName(); // Read all sketches @@ -204,7 +159,7 @@ private Triple getMinMedianMaxStringKey(int dimension) t for (String fileName : fileNames) { String sketchesFile = fileName.replace(".parquet", ".sketches"); LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); + Sketches sketches = loadSketches(sketchesFile); sketchList.add(sketches.getQuantilesSketch(keyField)); } @@ -221,7 +176,7 @@ private Triple getMinMedianMaxStringKey(int dimension) t return new ImmutableTriple<>(min, median, max); } - private Triple getMinMedianMaxByteArrayKey(int dimension) throws IOException { + private Triple getMinMedianMaxByteArrayKey(int dimension) { String keyField = schema.getRowKeyFields().get(dimension).getName(); // Read all sketches @@ -229,7 +184,7 @@ private Triple getMinMedianMaxByteArrayKey(int for (String fileName : fileNames) { String sketchesFile = fileName.replace(".parquet", ".sketches"); LOGGER.info("Loading Sketches from {}", sketchesFile); - Sketches sketches = sketchesLoader.load(sketchesFile); + Sketches sketches = loadSketches(sketchesFile); sketchList.add(sketches.getQuantilesSketch(keyField)); } @@ -246,63 +201,20 @@ private Triple getMinMedianMaxByteArrayKey(int return new ImmutableTriple<>(min, median, max); } - private List removeRange(List inputRanges, String rangeToRemove) { - return inputRanges.stream() - .filter(r -> !r.getFieldName().equals(rangeToRemove)) - .collect(Collectors.toList()); - } - - private void splitPartition(Partition partition, Object splitPoint, int dimension) throws StateStoreException { - Field fieldToSplitOn = schema.getRowKeyFields().get(dimension); - LOGGER.info("Splitting partition {} on split point {} in dimension {}", partition.getId(), splitPoint, dimension); - - // New partitions - List leftChildRanges = removeRange(partition.getRegion().getRanges(), fieldToSplitOn.getName()); - Range rangeForSplitDimensionLeftChild = rangeFactory.createRange(fieldToSplitOn, partition.getRegion().getRange(fieldToSplitOn.getName()).getMin(), splitPoint); - leftChildRanges.add(rangeForSplitDimensionLeftChild); - Region leftChildRegion = new Region(leftChildRanges); - Partition leftChild = Partition.builder() - .region(leftChildRegion) - .id(idSupplier.get()) - .leafPartition(true) - .parentPartitionId(partition.getId()) - .childPartitionIds(new ArrayList<>()) - .dimension(-1) - .build(); - - List rightChildRanges = removeRange(partition.getRegion().getRanges(), fieldToSplitOn.getName()); - Range rangeForSplitDimensionRightChild = rangeFactory.createRange(fieldToSplitOn, splitPoint, partition.getRegion().getRange(fieldToSplitOn.getName()).getMax()); - rightChildRanges.add(rangeForSplitDimensionRightChild); - Region rightChildRegion = new Region(rightChildRanges); - Partition rightChild = Partition.builder() - .region(rightChildRegion) - .id(idSupplier.get()) - .leafPartition(true) - .parentPartitionId(partition.getId()) - .childPartitionIds(new ArrayList<>()) - .dimension(-1) - .build(); - - // Updated split partition - partition = partition.toBuilder() - .leafPartition(false) - .childPartitionIds(Arrays.asList(leftChild.getId(), rightChild.getId())) - .dimension(dimension).build(); - - LOGGER.info("Updating StateStore:"); - LOGGER.info("Split partition ({}) is marked as not a leaf partition, split on field {}", - partition.getId(), fieldToSplitOn.getName()); - LOGGER.info("New partition: {}", leftChild); - LOGGER.info("New partition: {}", rightChild); - - stateStore.atomicallyUpdatePartitionAndCreateNewOnes(partition, leftChild, rightChild); + private Sketches loadSketches(String filename) { + try { + return sketchesLoader.load(filename); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } public interface SketchesLoader { Sketches load(String filename) throws IOException; } - public static SketchesLoader loadFromFile(Schema schema, Configuration conf) { + public static SketchesLoader loadSketchesFromFile(Schema schema, Configuration conf) { return (filename) -> new SketchesSerDeToS3(schema).loadFromHadoopFS(new Path(filename), conf); } + } diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java new file mode 100644 index 0000000000..c317d45a0c --- /dev/null +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartition.java @@ -0,0 +1,116 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.splitter.split; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sleeper.core.partition.Partition; +import sleeper.core.schema.Field; +import sleeper.core.schema.Schema; +import sleeper.core.statestore.StateStore; +import sleeper.core.statestore.StateStoreException; +import sleeper.splitter.split.FindPartitionSplitPoint.SketchesLoader; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +import static sleeper.splitter.split.FindPartitionSplitPoint.loadSketchesFromFile; + +/** + * Splits a partition. Identifies the median value of the first dimension. If that leads to a valid split (i.e. one + * where it is not equal to the minimum value and not equal to the maximum value) then that is used to split the + * partition. If it doesn't lead to a valid split then the above is repeated for the second dimension. This continues + * until either a valid split is found or no split is possible. + *

+ * Note that there are two situations in which a partition cannot be split: + * - If the partition consists of a single point (i.e. the minimum equals the maximum). + * - If the median equals the minimum then the partition cannot be split. + * This is because it would have to be split into [min, median) and [median, max), but if the min equals the median then + * the left one can't have any data in it as a key x in it would have to have min <= x < median = min which is a + * contradiction. + */ +public class SplitPartition { + public static final Logger LOGGER = LoggerFactory.getLogger(SplitPartition.class); + + private final StateStore stateStore; + private final Schema schema; + private final SketchesLoader sketchesLoader; + private final Supplier idSupplier; + + public SplitPartition(StateStore stateStore, + Schema schema, + Configuration conf) { + this(stateStore, schema, loadSketchesFromFile(schema, conf)); + } + + public SplitPartition(StateStore stateStore, + Schema schema, + SketchesLoader sketchesLoader) { + this(stateStore, schema, sketchesLoader, () -> UUID.randomUUID().toString()); + } + + public SplitPartition(StateStore stateStore, + Schema schema, + SketchesLoader sketchesLoader, + Supplier idSupplier) { + this.stateStore = stateStore; + this.schema = schema; + this.sketchesLoader = sketchesLoader; + this.idSupplier = idSupplier; + } + + public void splitPartition(Partition partition, List fileNames) { + getResultIfSplittable(partition, fileNames) + .ifPresent(this::apply); + } + + private Optional getResultIfSplittable(Partition partition, List fileNames) { + FindPartitionSplitPoint findSplitPoint = new FindPartitionSplitPoint(schema, fileNames, sketchesLoader); + return IntStream.range(0, schema.getRowKeyFields().size()) + .mapToObj(dimension -> findSplitPoint.splitPointForDimension(dimension) + .map(splitPoint -> resultFactory().splitPartition(partition, splitPoint, dimension))) + .flatMap(Optional::stream) + .findFirst(); + } + + private SplitPartitionResultFactory resultFactory() { + return new SplitPartitionResultFactory(schema, idSupplier); + } + + private void apply(SplitPartitionResult result) { + + Partition parentPartition = result.getParentPartition(); + Partition leftChild = result.getLeftChild(); + Partition rightChild = result.getRightChild(); + Field splitField = schema.getRowKeyFields().get(parentPartition.getDimension()); + LOGGER.info("Updating StateStore:"); + LOGGER.info("Split partition ({}) is marked as not a leaf partition, split on field {}", + parentPartition.getId(), splitField.getName()); + LOGGER.info("New partition: {}", leftChild); + LOGGER.info("New partition: {}", rightChild); + + try { + stateStore.atomicallyUpdatePartitionAndCreateNewOnes(parentPartition, leftChild, rightChild); + } catch (StateStoreException e) { + throw new RuntimeException(e); + } + } +} diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResult.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResult.java new file mode 100644 index 0000000000..3f31466f86 --- /dev/null +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResult.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.splitter.split; + +import sleeper.core.partition.Partition; + +public class SplitPartitionResult { + + private final Partition parentPartition; + private final Partition leftChild; + private final Partition rightChild; + + public SplitPartitionResult(Partition parentPartition, Partition leftChild, Partition rightChild) { + this.parentPartition = parentPartition; + this.leftChild = leftChild; + this.rightChild = rightChild; + } + + public Partition getParentPartition() { + return parentPartition; + } + + public Partition getLeftChild() { + return leftChild; + } + + public Partition getRightChild() { + return rightChild; + } +} diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java new file mode 100644 index 0000000000..73983aa26a --- /dev/null +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/split/SplitPartitionResultFactory.java @@ -0,0 +1,109 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.splitter.split; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sleeper.core.partition.Partition; +import sleeper.core.range.Range; +import sleeper.core.range.Range.RangeFactory; +import sleeper.core.range.Region; +import sleeper.core.schema.Field; +import sleeper.core.schema.Schema; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * Identifies the median value of the first dimension. If that leads to a valid + * split (i.e. one where it is not equal to the minimum value and not equal to + * the maximum value) then that is used to split the partition. If it doesn't + * lead to a valid split then the above is repeated for the second dimension. + * This continues until either a valid split is found or no split is possible. + *

+ * Note that there are two situations in which a partition cannot be split: + * - If the partition consists of a single point (i.e. the minimum + * equals the maximum). + * - If the median equals the minimum then the partition cannot be split. + * This is because it would have to be split into [min, median) and [median, max), + * but if the min equals the median then the left one can't have any data in it + * as a key x in it would have to have min <= x < median = min which is a + * contradiction. + *

+ */ +public class SplitPartitionResultFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SplitPartitionResultFactory.class); + + private final Schema schema; + private final RangeFactory rangeFactory; + private final Supplier idSupplier; + + public SplitPartitionResultFactory(Schema schema, Supplier idSupplier) { + this.schema = schema; + this.rangeFactory = new RangeFactory(schema); + this.idSupplier = idSupplier; + } + + public SplitPartitionResult splitPartition(Partition partition, Object splitPoint, int dimension) { + Field fieldToSplitOn = schema.getRowKeyFields().get(dimension); + LOGGER.info("Splitting partition {} on split point {} in dimension {}", partition.getId(), splitPoint, dimension); + + // New partitions + List leftChildRanges = removeRange(partition.getRegion().getRanges(), fieldToSplitOn.getName()); + Range rangeForSplitDimensionLeftChild = rangeFactory.createRange(fieldToSplitOn, partition.getRegion().getRange(fieldToSplitOn.getName()).getMin(), splitPoint); + leftChildRanges.add(rangeForSplitDimensionLeftChild); + Region leftChildRegion = new Region(leftChildRanges); + Partition leftChild = Partition.builder() + .region(leftChildRegion) + .id(idSupplier.get()) + .leafPartition(true) + .parentPartitionId(partition.getId()) + .childPartitionIds(new ArrayList<>()) + .dimension(-1) + .build(); + + List rightChildRanges = removeRange(partition.getRegion().getRanges(), fieldToSplitOn.getName()); + Range rangeForSplitDimensionRightChild = rangeFactory.createRange(fieldToSplitOn, splitPoint, partition.getRegion().getRange(fieldToSplitOn.getName()).getMax()); + rightChildRanges.add(rangeForSplitDimensionRightChild); + Region rightChildRegion = new Region(rightChildRanges); + Partition rightChild = Partition.builder() + .region(rightChildRegion) + .id(idSupplier.get()) + .leafPartition(true) + .parentPartitionId(partition.getId()) + .childPartitionIds(new ArrayList<>()) + .dimension(-1) + .build(); + + // Updated split partition + partition = partition.toBuilder() + .leafPartition(false) + .childPartitionIds(Arrays.asList(leftChild.getId(), rightChild.getId())) + .dimension(dimension).build(); + + return new SplitPartitionResult(partition, leftChild, rightChild); + } + + private List removeRange(List inputRanges, String rangeToRemove) { + return inputRanges.stream() + .filter(r -> !r.getFieldName().equals(rangeToRemove)) + .collect(Collectors.toList()); + } +} diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/FindPartitionsToSplitTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/find/FindPartitionsToSplitTest.java similarity index 99% rename from java/splitter/splitter-core/src/test/java/sleeper/splitter/FindPartitionsToSplitTest.java rename to java/splitter/splitter-core/src/test/java/sleeper/splitter/find/FindPartitionsToSplitTest.java index 4d119cb8bf..c299d631de 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/FindPartitionsToSplitTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/find/FindPartitionsToSplitTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionJobDefinitionSerDeTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDeTest.java similarity index 99% rename from java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionJobDefinitionSerDeTest.java rename to java/splitter/splitter-core/src/test/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDeTest.java index 6a4e2b51d2..525957341f 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionJobDefinitionSerDeTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/find/SplitPartitionJobDefinitionSerDeTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.find; import org.junit.jupiter.api.Test; diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/split/SplitPartitionTest.java similarity index 84% rename from java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java rename to java/splitter/splitter-core/src/test/java/sleeper/splitter/split/SplitPartitionTest.java index f3d5bca607..c4057a78f5 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/SplitPartitionIT.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/split/SplitPartitionTest.java @@ -13,14 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.splitter; +package sleeper.splitter.split; -import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import sleeper.core.partition.Partition; import sleeper.core.partition.PartitionTree; @@ -33,46 +30,32 @@ import sleeper.core.schema.type.LongType; import sleeper.core.schema.type.StringType; import sleeper.core.statestore.FileReference; +import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStore; -import sleeper.ingest.IngestRecordsFromIterator; -import sleeper.ingest.impl.IngestCoordinator; -import sleeper.ingest.impl.ParquetConfiguration; -import sleeper.ingest.impl.partitionfilewriter.DirectPartitionFileWriterFactory; -import sleeper.ingest.impl.recordbatch.arraylist.ArrayListRecordBatchFactory; -import sleeper.ingest.testutils.IngestCoordinatorTestHelper; - -import java.io.IOException; -import java.nio.file.Path; +import sleeper.sketches.Sketches; +import sleeper.splitter.split.FindPartitionSplitPoint.SketchesLoader; + import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; -import static java.nio.file.Files.createTempDirectory; import static org.assertj.core.api.Assertions.assertThat; import static sleeper.core.statestore.inmemory.StateStoreTestHelper.inMemoryStateStoreWithPartitions; -import static sleeper.splitter.SplitMultiDimensionalPartitionImpl.loadFromFile; - -public class SplitPartitionIT { - @TempDir - public Path folder; +public class SplitPartitionTest { private final Field field = new Field("key", new IntType()); private final Schema schema = Schema.builder().rowKeyFields(field).build(); - private String localDir; - private String filePathPrefix; - @BeforeEach - void setUp() throws IOException { - localDir = createTempDirectory(folder, null).toString(); - filePathPrefix = createTempDirectory(folder, null).toString(); - } + private final Map fileToSketchMap = new HashMap<>(); @Nested @DisplayName("Skip split") @@ -98,7 +81,7 @@ public void shouldNotSplitPartitionForIntKeyIfItCannotBeSplitBecausePartitionIsO record.put("key", r); records.add(record); } - ingestFileFromRecords(schema, stateStore, records.stream()); + ingestRecordsToSketchOnPartition(schema, stateStore, partition.getId(), records.stream()); } } @@ -140,7 +123,7 @@ public void shouldNotSplitPartitionForIntKeyIfItCannotBeSplitBecauseDataIsConsta records.add(record); } } - ingestFileFromRecords(schema, stateStore, records.stream()); + ingestRecordsToSketchOnPartition(schema, stateStore, partition.getId(), records.stream()); } } @@ -167,11 +150,6 @@ public void shouldNotSplitPartitionForByteArrayKeyIfItCannotBeSplitBecausePartit for (Partition partition : tree.getAllPartitions()) { for (int i = 0; i < 10; i++) { List records = new ArrayList<>(); - for (int r = 0; r < 100; r++) { - Record record = new Record(); - record.put("key", new byte[]{(byte) r}); - records.add(record); - } if (partition.getId().equals("id1")) { int j = 0; for (byte r = (byte) 0; @@ -194,7 +172,7 @@ public void shouldNotSplitPartitionForByteArrayKeyIfItCannotBeSplitBecausePartit records.add(record); } } - ingestFileFromRecords(schema, stateStore, records.stream()); + ingestRecordsToSketchOnPartition(schema, stateStore, partition.getId(), records.stream()); } } @@ -243,7 +221,7 @@ public void shouldNotSplitPartitionForByteArrayKeyIfItCannotBeSplitBecauseDataIs records.add(record); } } - ingestFileFromRecords(schema, stateStore, records.stream()); + ingestRecordsToSketchOnPartition(schema, stateStore, partition.getId(), records.stream()); } } @@ -266,7 +244,7 @@ void shouldSplitPartitionForIntKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(100 * i, 100 * (i + 1)) .mapToObj(r -> new Record(Map.of("key", r))))); @@ -289,7 +267,7 @@ void shouldSplitPartitionForLongKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", LongStream.range(100L * i, 100L * (i + 1)) .mapToObj(r -> new Record(Map.of("key", r))))); @@ -312,7 +290,7 @@ void shouldSplitPartitionForStringKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of("key", String.format("A%s%s", i, r)))))); @@ -335,7 +313,7 @@ void shouldSplitPartitionForByteArrayKey() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of("key", new byte[]{(byte) r}))))); @@ -364,7 +342,7 @@ public void shouldSplitIntKeyOnFirstDimension() throws Exception { .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", r, @@ -391,7 +369,7 @@ public void shouldSplitIntKeyOnSecondDimensionWhenAllValuesForFirstKeyAreTheSame .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", 10, @@ -418,7 +396,7 @@ public void shouldSplitIntKeyOnFirstDimensionWhenSecondDimensionCanBeSplit() thr .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", r, @@ -445,7 +423,7 @@ public void shouldSplitIntKeyOnSecondDimensionWhenMinAndMedianForFirstKeyAreTheS .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) // The majority of the values are 10; so min should equal median .mapToObj(r -> new Record(Map.of( @@ -474,7 +452,7 @@ public void shouldSplitByteKeyOnFirstDimension() throws Exception { .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", new byte[]{(byte) r}, @@ -501,7 +479,7 @@ public void shouldSplitByteKeyOnSecondDimensionWhenAllValuesForFirstKeyAreTheSam .singlePartition("A") .buildList()); IntStream.range(0, 10) - .forEach(i -> ingestFileFromRecords(schema, stateStore, + .forEach(i -> ingestRecordsToSketchOnPartition(schema, stateStore, "A", IntStream.range(0, 100) .mapToObj(r -> new Record(Map.of( "key1", new byte[]{(byte) -100}, @@ -519,50 +497,49 @@ public void shouldSplitByteKeyOnSecondDimensionWhenAllValuesForFirstKeyAreTheSam } } - private static void ingestRecordsFromIterator( - Schema schema, StateStore stateStore, String localDir, - String filePathPrefix, Iterator recordIterator) throws Exception { - ParquetConfiguration parquetConfiguration = IngestCoordinatorTestHelper.parquetConfiguration(schema, new Configuration()); - IngestCoordinator ingestCoordinator = IngestCoordinatorTestHelper.standardIngestCoordinator( - stateStore, schema, - ArrayListRecordBatchFactory.builder() - .localWorkingDirectory(localDir) - .maxNoOfRecordsInMemory(1000000) - .maxNoOfRecordsInLocalStore(100000000) - .parquetConfiguration(parquetConfiguration) - .buildAcceptingRecords(), - DirectPartitionFileWriterFactory.from(parquetConfiguration, filePathPrefix)); - new IngestRecordsFromIterator(ingestCoordinator, recordIterator).write(); - } + private void ingestRecordsToSketchOnPartition(Schema schema, StateStore stateStore, String partitionId, Stream recordsStream) { + Sketches sketches = Sketches.from(schema); + AtomicLong recordCount = new AtomicLong(); + + recordsStream.forEach(rec -> { + sketches.update(schema, rec); + recordCount.incrementAndGet(); + }); - private void ingestFileFromRecords(Schema schema, StateStore stateStore, Stream recordsStream) { + FileReference recordFileReference = FileReferenceFactory.from(stateStore).partitionFile(partitionId, UUID.randomUUID().toString(), recordCount.get()); try { - ingestRecordsFromIterator(schema, stateStore, localDir, filePathPrefix, recordsStream.iterator()); - } catch (Exception e) { - throw new RuntimeException(e); + stateStore.addFile(recordFileReference); + } catch (Exception ex) { + throw new RuntimeException(ex); } + + fileToSketchMap.put(recordFileReference.getFilename(), sketches); } - private static void splitSinglePartition(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { + private void splitSinglePartition(Schema schema, StateStore stateStore, Supplier generateIds) throws Exception { Partition partition = stateStore.getAllPartitions().get(0); List fileNames = stateStore.getFileReferences().stream() .map(FileReference::getFilename) .collect(Collectors.toList()); - SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromFile(schema, new Configuration()), generateIds); + SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadSketchesFromMap(), generateIds); partitionSplitter.splitPartition(partition, fileNames); } - private static void splitPartition(Schema schema, StateStore stateStore, String partitionId, Supplier generateIds) throws Exception { + private void splitPartition(Schema schema, StateStore stateStore, String partitionId, Supplier generateIds) throws Exception { PartitionTree tree = new PartitionTree(stateStore.getAllPartitions()); Partition partition = tree.getPartition(partitionId); List fileNames = stateStore.getFileReferences().stream() .filter(file -> partitionId.equals(file.getPartitionId())) .map(FileReference::getFilename) .collect(Collectors.toList()); - SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadFromFile(schema, new Configuration()), generateIds); + SplitPartition partitionSplitter = new SplitPartition(stateStore, schema, loadSketchesFromMap(), generateIds); partitionSplitter.splitPartition(partition, fileNames); } + public SketchesLoader loadSketchesFromMap() { + return (filename) -> fileToSketchMap.get(filename); + } + private static Supplier generateIds(String... ids) { return Arrays.stream(ids).iterator()::next; } diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitLambda.java index d598f9108c..fe1ce7a674 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitLambda.java @@ -37,7 +37,7 @@ import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; import sleeper.io.parquet.utils.HadoopConfigurationProvider; -import sleeper.splitter.FindPartitionsToSplit; +import sleeper.splitter.find.FindPartitionsToSplit; import sleeper.statestore.StateStoreProvider; import java.time.Instant; diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java index b83220b4d8..dc70ab849e 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SplitPartitionLambda.java @@ -19,6 +19,8 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse.BatchItemFailure; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; @@ -31,22 +33,21 @@ import sleeper.configuration.properties.table.TableProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; import sleeper.core.statestore.StateStore; -import sleeper.core.statestore.StateStoreException; import sleeper.io.parquet.utils.HadoopConfigurationProvider; -import sleeper.splitter.SplitPartition; -import sleeper.splitter.SplitPartitionJobDefinition; -import sleeper.splitter.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.find.SplitPartitionJobDefinition; +import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.split.SplitPartition; import sleeper.statestore.StateStoreProvider; -import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; /** * Triggered by an SQS event containing a partition splitting job to do. */ -@SuppressWarnings("unused") -public class SplitPartitionLambda implements RequestHandler { +public class SplitPartitionLambda implements RequestHandler { private final PropertiesReloader propertiesReloader; private final Configuration conf; private static final Logger LOGGER = LoggerFactory.getLogger(SplitPartitionLambda.class); @@ -70,22 +71,23 @@ public SplitPartitionLambda() { } @Override - public Void handleRequest(SQSEvent event, Context context) { + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { propertiesReloader.reloadIfNeeded(); - try { - for (SQSEvent.SQSMessage message : event.getRecords()) { - String serialisedJob = message.getBody(); + List batchItemFailures = new ArrayList<>(); + for (SQSEvent.SQSMessage message : event.getRecords()) { + try { SplitPartitionJobDefinition job = new SplitPartitionJobDefinitionSerDe(tablePropertiesProvider) - .fromJson(serialisedJob); + .fromJson(message.getBody()); LOGGER.info("Received partition splitting job {}", job); TableProperties tableProperties = tablePropertiesProvider.getById(job.getTableId()); StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); SplitPartition splitPartition = new SplitPartition(stateStore, tableProperties.getSchema(), conf); splitPartition.splitPartition(job.getPartition(), job.getFileNames()); + } catch (RuntimeException e) { + LOGGER.error("Failed partition splitting", e); + batchItemFailures.add(new BatchItemFailure(message.getMessageId())); } - } catch (IOException | StateStoreException ex) { - LOGGER.error("Exception handling partition splitting job", ex); } - return null; + return new SQSBatchResponse(batchItemFailures); } } diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SqsSplitPartitionJobSender.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SqsSplitPartitionJobSender.java index 49ac998e22..9bdf27e5ae 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SqsSplitPartitionJobSender.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/SqsSplitPartitionJobSender.java @@ -23,8 +23,8 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; -import sleeper.splitter.SplitPartitionJobDefinition; -import sleeper.splitter.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.find.SplitPartitionJobDefinition; +import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_SPLITTING_JOB_QUEUE_URL; diff --git a/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/FindPartitionsToSplitIT.java b/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/FindPartitionsToSplitIT.java index 877ed9e9cc..4666a918c7 100644 --- a/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/FindPartitionsToSplitIT.java +++ b/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/FindPartitionsToSplitIT.java @@ -45,9 +45,9 @@ import sleeper.ingest.impl.ParquetConfiguration; import sleeper.ingest.impl.partitionfilewriter.DirectPartitionFileWriterFactory; import sleeper.ingest.impl.recordbatch.arraylist.ArrayListRecordBatchFactory; -import sleeper.splitter.FindPartitionsToSplit; -import sleeper.splitter.SplitPartitionJobDefinition; -import sleeper.splitter.SplitPartitionJobDefinitionSerDe; +import sleeper.splitter.find.FindPartitionsToSplit; +import sleeper.splitter.find.SplitPartitionJobDefinition; +import sleeper.splitter.find.SplitPartitionJobDefinitionSerDe; import sleeper.statestore.FixedStateStoreProvider; import java.io.File; diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplitting.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplitting.java index 531495eac2..1b954d3ae7 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplitting.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplitting.java @@ -24,8 +24,8 @@ import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.util.PollWithRetries; -import sleeper.splitter.FindPartitionToSplitResult; -import sleeper.splitter.FindPartitionsToSplit; +import sleeper.splitter.find.FindPartitionToSplitResult; +import sleeper.splitter.find.FindPartitionsToSplit; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import java.time.Duration; diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java index eb970e8577..9613ff0520 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionSplittingDriver.java @@ -21,14 +21,12 @@ import sleeper.configuration.properties.table.TableProperties; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; -import sleeper.splitter.FindPartitionsToSplit; -import sleeper.splitter.FindPartitionsToSplit.JobSender; -import sleeper.splitter.SplitPartition; +import sleeper.splitter.find.FindPartitionsToSplit; +import sleeper.splitter.find.FindPartitionsToSplit.JobSender; +import sleeper.splitter.split.SplitPartition; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import sleeper.systemtest.dsl.partitioning.PartitionSplittingDriver; -import java.io.IOException; - public class InMemoryPartitionSplittingDriver implements PartitionSplittingDriver { public static final Logger LOGGER = LoggerFactory.getLogger(InMemoryPartitionSplittingDriver.class); private final SystemTestInstanceContext instance; @@ -59,11 +57,7 @@ private JobSender splitPartition() { TableProperties tableProperties = instance.getTablePropertiesProvider().getById(job.getTableId()); StateStore stateStore = instance.getStateStoreProvider().getStateStore(tableProperties); SplitPartition splitPartition = new SplitPartition(stateStore, tableProperties.getSchema(), sketches::load); - try { - splitPartition.splitPartition(job.getPartition(), job.getFileNames()); - } catch (IOException | StateStoreException e) { - throw new RuntimeException("Failed to split partition", e); - } + splitPartition.splitPartition(job.getPartition(), job.getFileNames()); }; } }