Skip to content

Commit

Permalink
Merge pull request #3029 from gchq/3027-support-committing-partition-…
Browse files Browse the repository at this point in the history
…splits-in-state-store-committer-lambda

Issue 3027 - Support committing partition splits in state store committer lambda
  • Loading branch information
rtjd6554 authored Aug 7, 2024
2 parents b2701ea + fa23fd7 commit ffbd0d4
Show file tree
Hide file tree
Showing 14 changed files with 497 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public enum CommitRequestType {
COMPACTION_FINISHED,
INGEST_ADD_FILES,
STORED_IN_S3,
COMPACTION_JOB_ID_ASSIGNMENT
COMPACTION_JOB_ID_ASSIGNMENT,
SPLIT_PARTITION
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.commit;

import sleeper.core.partition.Partition;

import java.util.Objects;

/**
* A request to commit to the state store when we split a partition into new child partitions.
*/
public class SplitPartitionCommitRequest {

private final String tableId;
private final Partition parentPartition;
private final Partition leftChild;
private final Partition rightChild;

public SplitPartitionCommitRequest(String tableId, Partition parentPartition, Partition leftChild, Partition rightChild) {
this.tableId = tableId;
this.parentPartition = parentPartition;
this.leftChild = leftChild;
this.rightChild = rightChild;
}

public String getTableId() {
return tableId;
}

public Partition getParentPartition() {
return parentPartition;
}

public Partition getLeftChild() {
return leftChild;
}

public Partition getRightChild() {
return rightChild;
}

@Override
public int hashCode() {
return Objects.hash(tableId, parentPartition, leftChild, rightChild);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof SplitPartitionCommitRequest)) {
return false;
}
SplitPartitionCommitRequest other = (SplitPartitionCommitRequest) obj;
return Objects.equals(tableId, other.tableId) && Objects.equals(parentPartition, other.parentPartition) && Objects.equals(leftChild, other.leftChild)
&& Objects.equals(rightChild, other.rightChild);
}

@Override
public String toString() {
return "SplitPartitionCommitRequest{tableId=" + tableId + ", parentPartition=" + parentPartition + ", leftChild=" + leftChild + ", rightChild=" + rightChild + "}";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.commit;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionSerDe.PartitionJsonSerDe;
import sleeper.core.schema.Schema;
import sleeper.core.util.GsonConfig;

/**
* Serialises and deserialises a commit request to split a partition.
*/
public class SplitPartitionCommitRequestSerDe {

private final Gson gson;
private final Gson gsonPrettyPrint;

public SplitPartitionCommitRequestSerDe(Schema schema) {
GsonBuilder builder = GsonConfig.standardBuilder()
.registerTypeAdapter(Partition.class, new PartitionJsonSerDe(schema))
.serializeNulls();
gson = builder.create();
gsonPrettyPrint = builder.setPrettyPrinting().create();
}

/**
* Serialises a split partition commit request to a JSON string.
*
* @param request the commit request
* @return the JSON string
*/
public String toJson(SplitPartitionCommitRequest request) {
return gson.toJson(new WrappedCommitRequest(request), WrappedCommitRequest.class);
}

/**
* Serialises a split partition commit request to a pretty-printed JSON string.
*
* @param request the commit request
* @return the pretty-printed JSON string
*/
public String toJsonPrettyPrint(SplitPartitionCommitRequest request) {
return gsonPrettyPrint.toJson(new WrappedCommitRequest(request), WrappedCommitRequest.class);
}

/**
* Deserialises a split partition commit request from a JSON string.
*
* @param json the JSON string
* @return the commit request
*/
public SplitPartitionCommitRequest fromJson(String json) {
WrappedCommitRequest wrappedRequest = gson.fromJson(json, WrappedCommitRequest.class);
if (CommitRequestType.SPLIT_PARTITION == wrappedRequest.type) {
return wrappedRequest.request;
}
throw new IllegalArgumentException("Unexpected request type");
}

/**
* Stores a split partition commit request with the type of commit request. Used by the state store committer to
* deserialise the correct commit request.
*/
private static class WrappedCommitRequest {
private final CommitRequestType type;
private final SplitPartitionCommitRequest request;

WrappedCommitRequest(SplitPartitionCommitRequest request) {
this.type = CommitRequestType.SPLIT_PARTITION;
this.request = request;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import sleeper.core.statestore.FileReferenceSerDe;
import sleeper.core.util.GsonConfig;

/**
Expand All @@ -29,8 +28,7 @@ public class StateStoreCommitRequestInS3SerDe {
private final Gson gsonPrettyPrint;

public StateStoreCommitRequestInS3SerDe() {
GsonBuilder builder = GsonConfig.standardBuilder()
.addSerializationExclusionStrategy(FileReferenceSerDe.excludeUpdateTimes());
GsonBuilder builder = GsonConfig.standardBuilder();
gson = builder.create();
gsonPrettyPrint = builder.setPrettyPrinting().create();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.commit;

import org.approvaltests.Approvals;
import org.junit.jupiter.api.Test;

import sleeper.core.partition.PartitionTree;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.StringType;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;

public class SplitPartitionCommitRequestSerDeTest {

private final Schema schema = schemaWithKey("key", new StringType());
private final SplitPartitionCommitRequestSerDe serDe = new SplitPartitionCommitRequestSerDe(schema);

@Test
void shouldSerialiseSplitPartitionRequest() {
// Given
PartitionTree partitionTree = new PartitionsBuilder(schema)
.rootFirst("root")
.splitToNewChildren("root", "left", "right", "aaa")
.buildTree();
SplitPartitionCommitRequest splitPartitionCommitRequest = new SplitPartitionCommitRequest(
"test-table",
partitionTree.getRootPartition(),
partitionTree.getPartition("left"), partitionTree.getPartition("right"));

// When
String json = serDe.toJsonPrettyPrint(splitPartitionCommitRequest);

// Then
assertThat(serDe.fromJson(json)).isEqualTo(splitPartitionCommitRequest);
Approvals.verify(json);
}

@Test
void shouldFailToDeserialiseNonSplitPartitionCommitRequest() {
assertThatThrownBy(() -> serDe.fromJson("{\"type\": \"OTHER\", \"request\":{}}"))
.isInstanceOf(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"type": "SPLIT_PARTITION",
"request": {
"tableId": "test-table",
"parentPartition": {
"partitionId": "root",
"isLeafPartition": false,
"parentPartitionId": null,
"childPartitionIds": [
"left",
"right"
],
"region": {
"key": {
"min": "",
"minInclusive": true,
"max": null,
"maxInclusive": false
},
"stringsBase64Encoded": true
},
"dimension": 0
},
"leftChild": {
"partitionId": "left",
"isLeafPartition": true,
"parentPartitionId": "root",
"childPartitionIds": [],
"region": {
"key": {
"min": "",
"minInclusive": true,
"max": "YWFh",
"maxInclusive": false
},
"stringsBase64Encoded": true
},
"dimension": -1
},
"rightChild": {
"partitionId": "right",
"isLeafPartition": true,
"parentPartitionId": "root",
"childPartitionIds": [],
"region": {
"key": {
"min": "YWFh",
"minInclusive": true,
"max": null,
"maxInclusive": false
},
"stringsBase64Encoded": true
},
"dimension": -1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@ public class SplitPartitionJobDefinitionSerDe {
private final Gson gsonPrettyPrinting;

public SplitPartitionJobDefinitionSerDe(TablePropertiesProvider tablePropertiesProvider) {
try {
GsonBuilder builder = new GsonBuilder()
.registerTypeAdapter(Class.forName(SplitPartitionJobDefinition.class.getName()), new SplitPartitionJobDefinitionJsonSerDe(tablePropertiesProvider))
.serializeNulls();
this.gson = builder.create();
this.gsonPrettyPrinting = builder.setPrettyPrinting().create();
} catch (ClassNotFoundException e) {
throw new RuntimeException("Exception creating Gson", e);
}
GsonBuilder builder = new GsonBuilder()
.registerTypeAdapter(SplitPartitionJobDefinition.class, new SplitPartitionJobDefinitionJsonSerDe(tablePropertiesProvider))
.serializeNulls();
this.gson = builder.create();
this.gsonPrettyPrinting = builder.setPrettyPrinting().create();
}

public String toJson(SplitPartitionJobDefinition splitPartitionJobDefinition) {
Expand Down
7 changes: 7 additions & 0 deletions java/statestore-commit/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>configuration</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>compaction-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import sleeper.compaction.job.commit.CompactionJobCommitRequest;
import sleeper.compaction.job.commit.CompactionJobIdAssignmentCommitRequest;
import sleeper.core.statestore.commit.SplitPartitionCommitRequest;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3;
import sleeper.ingest.job.commit.IngestAddFilesCommitRequest;

Expand Down Expand Up @@ -59,6 +60,16 @@ public static StateStoreCommitRequest forIngestAddFiles(IngestAddFilesCommitRequ
return new StateStoreCommitRequest(request);
}

/**
* Creates a request to commit a partition split to add new child partitions.
*
* @param request the commit request
* @return a state store commit request
*/
public static StateStoreCommitRequest forSplitPartition(SplitPartitionCommitRequest request) {
return new StateStoreCommitRequest(request);
}

/**
* Creates a request which is stored in S3.
*
Expand Down
Loading

0 comments on commit ffbd0d4

Please sign in to comment.