Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Export Query Splitter #3922

Merged
merged 24 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
478e966
Adding BulkExportQuerySplitter class
ab295382 Nov 29, 2024
7cf6b00
Merge branch '3541-lambda-Logic-to-create-bulk-export-partition-queri…
ab295382 Nov 29, 2024
3e52cc0
Merge branch '3541-lambda-Logic-to-create-bulk-export-partition-queri…
ab295382 Dec 4, 2024
b73d8ae
Added regions
ab295382 Dec 5, 2024
37a3990
Merge branch '3541-lambda-Logic-to-create-bulk-export-partition-queri…
ab295382 Dec 12, 2024
760ebda
Merge branch 'develop' of github.com:gchq/sleeper into 3818-bulk-expo…
ab295382 Dec 12, 2024
6fb9517
Refactor into core - model
ab295382 Dec 12, 2024
428d2e1
Adding WIP tests for BulkExportQuerySplitter
ab295382 Dec 12, 2024
f507e5d
Adding Javadoc
ab295382 Dec 13, 2024
64315dc
Merge branch 'develop' into 3818-bulk-export-query-splitter
ab295382 Dec 16, 2024
2147f07
Merge branch 'develop' into 3818-bulk-export-query-splitter
ab295382 Dec 16, 2024
7160b6f
Merge branch 'develop' of github.com:gchq/sleeper into 3818-bulk-expo…
ab295382 Dec 16, 2024
2b39944
Updated tests for BulkExportQuerySplitter
ab295382 Dec 16, 2024
c551421
Adding Java docs
ab295382 Dec 17, 2024
2feccfe
Merge branch 'develop' of github.com:gchq/sleeper into 3818-bulk-expo…
ab295382 Dec 17, 2024
a1a5fc1
Updated Javadocs and removed duplicate method.
ab295382 Dec 19, 2024
0173c54
Merge branch 'develop' of github.com:gchq/sleeper into 3818-bulk-expo…
ab295382 Dec 19, 2024
1618bf8
Removing records from tests
ab295382 Dec 19, 2024
ded0daf
Removed code no longer needed
ab295382 Dec 19, 2024
bbcc8d4
Adding tests for the init methods on BulkExportQuerySplitterTest
ab295382 Dec 20, 2024
9009e17
Merge branch 'develop' into 3818-bulk-export-query-splitter
patchwork01 Jan 13, 2025
818e4ed
Fix Javadoc
patchwork01 Jan 13, 2025
225f74a
Assert on queries created by BulkExportQuerySplitter
patchwork01 Jan 13, 2025
5e9da14
Fix BulkExportLeafPartitionQuery.toString
patchwork01 Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
* limitations under the License.
*/

package sleeper.bulkexport.model;
package sleeper.bulkexport.core.model;

import sleeper.core.partition.Partition;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.range.Region;

import java.util.List;
import java.util.Objects;

import static sleeper.core.properties.table.TableProperty.TABLE_ID;

/**
* An export query for a leaf partition. The query contains information about
* which files should be read.
Expand Down Expand Up @@ -65,14 +69,31 @@ public BulkExportLeafPartitionQuery validate() {
}

/**
* Builder class for BulkExportLeafPartitionQuery.
* Creates a builder for this class.
*
* @return a Builder object
* @return the builder
*/
public static Builder builder() {
return new Builder();
}

/**
* Creates a builder for this class, pre-initialised for a given partition.
*
* @param parentQuery the parent query
* @param tableProperties the Sleeper table properties
* @param partition the partition
* @return the builder
*/
public static Builder forPartition(BulkExportQuery parentQuery, TableProperties tableProperties, Partition partition) {
return builder()
.exportId(parentQuery.getExportId())
.tableId(tableProperties.get(TABLE_ID))
.leafPartitionId(partition.getId())
.regions(List.of(partition.getRegion()))
.partitionRegion(partition.getRegion());
}

/**
* Gets the table Id.
*
Expand Down Expand Up @@ -161,7 +182,7 @@ public int hashCode() {

@Override
public String toString() {
return "LeafPartitionQuery{" +
return "BulkExportLeafPartitionQuery{" +
"tableId='" + tableId + '\'' +
", exportId='" + exportId + '\'' +
", subExportId='" + subExportId + '\'' +
Expand All @@ -180,7 +201,7 @@ private static <T> T requireNonNull(T obj, Builder builder, String message) {
}

/**
* Builder for the BulkExportLeafPartitionQuery model.
* Builder for this class.
*/
public static final class Builder {
private String tableId;
Expand All @@ -195,11 +216,10 @@ private Builder() {
}

/**
* Provide the tableId.
* Provide the table ID.
*
* @param tableId the id for the table.
*
* @return the builder object.
* @param tableId the id of the Sleeper table
* @return the builder
*/
public Builder tableId(String tableId) {
this.tableId = tableId;
Expand All @@ -209,9 +229,8 @@ public Builder tableId(String tableId) {
/**
* Provide the exportId.
*
* @param exportId the id for the export.
*
* @return the builder object.
* @param exportId the id for the export
* @return the builder
*/
public Builder exportId(String exportId) {
this.exportId = exportId;
Expand All @@ -221,9 +240,8 @@ public Builder exportId(String exportId) {
/**
* Provide the subExportId.
*
* @param subExportId the id for the sub export.
*
* @return the builder object.
* @param subExportId the id for the sub export
* @return the builder
*/
public Builder subExportId(String subExportId) {
this.subExportId = subExportId;
Expand All @@ -233,33 +251,30 @@ public Builder subExportId(String subExportId) {
/**
* Provide the regions.
*
* @param regions a list of regions.
*
* @return the builder object.
* @param regions a list of regions
* @return the builder
*/
public Builder regions(List<Region> regions) {
this.regions = regions;
return this;
}

/**
* Provide the leafPartitionId.
* Provide the leaf partition ID.
*
* @param leafPartitionId the id for the leaf partition.
*
* @return the builder object.
* @param leafPartitionId the id for the leaf partition
* @return the builder
*/
public Builder leafPartitionId(String leafPartitionId) {
this.leafPartitionId = leafPartitionId;
return this;
}

/**
* Provide the partition regions.
*
* @param partitionRegion a partition regions.
* Provide the partition region.
*
* @return the builder object.
* @param partitionRegion the partition region
* @return the builder
*/
public Builder partitionRegion(Region partitionRegion) {
this.partitionRegion = partitionRegion;
Expand All @@ -269,20 +284,14 @@ public Builder partitionRegion(Region partitionRegion) {
/**
* Provide the leaf partition files.
*
* @param files the files to be exported.
*
* @return the builder object.
* @param files the files to be exported
* @return the builder object
*/
public Builder files(List<String> files) {
this.files = files;
return this;
}

/**
* Builds the BulkExportLeafPartitionQuery.
*
* @return a BulkExportLeafPartitionQuery object.
*/
public BulkExportLeafPartitionQuery build() {
return new BulkExportLeafPartitionQuery(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package sleeper.bulkexport.model;
package sleeper.bulkexport.core.model;

import com.google.gson.JsonElement;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.bulkexport.model;
package sleeper.bulkexport.core.model;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.bulkexport.model;
package sleeper.bulkexport.core.model;

import java.util.Objects;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.bulkexport.model;
package sleeper.bulkexport.core.model;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package sleeper.bulkexport.model;
package sleeper.bulkexport.core.model;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.bulkexport.core.recordretrieval;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.bulkexport.core.model.BulkExportLeafPartitionQuery;
import sleeper.bulkexport.core.model.BulkExportQuery;
import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionTree;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static sleeper.core.properties.table.TableProperty.QUERY_PROCESSOR_CACHE_TIMEOUT;

/**
* Splits up an export query into leaf partition export queries.
*/
public class BulkExportQuerySplitter {
private static final Logger LOGGER = LoggerFactory.getLogger(BulkExportQuerySplitter.class);

private final StateStore stateStore;
private final TableProperties tableProperties;
private final Supplier<String> idSupplier;
private final Supplier<Instant> timeSupplier;
private List<Partition> leafPartitions;
private PartitionTree partitionTree;
private Map<String, List<String>> partitionToFiles;
private Instant nextInitialiseTime;
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved

public BulkExportQuerySplitter(TableProperties tableProperties, StateStore stateStore) {
this(stateStore, tableProperties, () -> UUID.randomUUID().toString(), Instant::now);
}

public BulkExportQuerySplitter(StateStore stateStore, TableProperties tableProperties, Supplier<String> idSupplier, Supplier<Instant> timeSupplier) {
this.stateStore = stateStore;
this.tableProperties = tableProperties;
this.idSupplier = idSupplier;
this.timeSupplier = timeSupplier;
init(timeSupplier.get());
}

/**
* Initialises the partitions and the mapping from partitions to active files if
* needed.
*
* @throws StateStoreException if the state store can't be accessed
*/
public void initIfNeeded() throws StateStoreException {
Instant now = timeSupplier.get();
if (nextInitialiseTime.isAfter(now)) {
LOGGER.debug("Not refreshing state for table {}", tableProperties.getStatus());
return;
}
init(now);
}

private void init(Instant now) throws StateStoreException {
List<Partition> partitions = stateStore.getAllPartitions();
Map<String, List<String>> partitionToFileMapping = stateStore.getPartitionToReferencedFilesMap();
init(partitions, partitionToFileMapping, now);
}

private void init(List<Partition> partitions, Map<String, List<String>> partitionToFileMapping, Instant now) {
leafPartitions = partitions.stream()
.filter(Partition::isLeafPartition)
.collect(Collectors.toList());
partitionTree = new PartitionTree(partitions);
partitionToFiles = partitionToFileMapping;
nextInitialiseTime = now.plus(tableProperties.getInt(QUERY_PROCESSOR_CACHE_TIMEOUT), ChronoUnit.MINUTES);
LOGGER.info("Loaded state for table {}. Found {} partitions. Next initialise time: {}",
tableProperties.getStatus(), partitions.size(), nextInitialiseTime);
}

/**
* Splits up a query into a sub-query per relevant leaf partition. For each leaf partition, it
* finds the parent partitions in the tree and adds any files still belonging
* to the parent to the sub query.
*
* @param bulkExportQuery the query to be split up
* @return a list of {@link LeafPartitionQuery}s
*/
public List<BulkExportLeafPartitionQuery> splitIntoLeafPartitionQueries(BulkExportQuery bulkExportQuery) {
LOGGER.debug("There are {} relevant leaf partitions", leafPartitions.size());

List<BulkExportLeafPartitionQuery> leafPartitionQueriesList = new ArrayList<>();
for (Partition partition : leafPartitions) {
List<String> files = getFiles(partition);

if (files.isEmpty()) {
LOGGER.info("No files for partition {}", partition.getId());
continue;
}

// For each leaf partition, create query with pre-populated list of files that
// need to be read.
BulkExportLeafPartitionQuery bulkExportLeafPartitionQuery = BulkExportLeafPartitionQuery
.forPartition(bulkExportQuery, tableProperties, partition)
.subExportId(idSupplier.get())
.files(files)
.build();
LOGGER.debug("Created {}", bulkExportLeafPartitionQuery);
leafPartitionQueriesList.add(bulkExportLeafPartitionQuery);
}

return leafPartitionQueriesList;
}

/**
* Gets a list of file paths for a partition.
*
* @param partition which partition to get the files from
* @return a list of files
*/
protected List<String> getFiles(Partition partition) {
// Get all partitions up to the root of the tree
List<String> relevantPartitions = new ArrayList<>();
relevantPartitions.add(partition.getId());
relevantPartitions.addAll(partitionTree.getAllAncestorIds(partition.getId()));

// Get relevant files
List<String> files = new ArrayList<>();
for (String partitionId : relevantPartitions) {
List<String> filesForPartition = partitionToFiles.get(partitionId);
if (null != filesForPartition) {
files.addAll(partitionToFiles.get(partitionId));
}
}
return files;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.bulkexport.model;
package sleeper.bulkexport.core.model;

import com.google.common.io.CharStreams;
import org.approvaltests.Approvals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.bulkexport.model;
package sleeper.bulkexport.core.model;

import org.junit.jupiter.api.Test;

Expand Down
Loading
Loading