Skip to content

Commit 17ed9ae

Browse files
authored
Merge pull request #3700 from gchq/3699-check-many-job-file-assignments
Issue 3699 - Check many job file assignments in one query
2 parents d24ad0a + a92a2e9 commit 17ed9ae

File tree

9 files changed

+188
-56
lines changed

9 files changed

+188
-56
lines changed

java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/CompactionJob.java

+6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package sleeper.compaction.core.job;
1717

18+
import sleeper.core.statestore.CheckFileAssignmentsRequest;
19+
1820
import java.util.List;
1921
import java.util.Objects;
2022

@@ -65,6 +67,10 @@ public List<String> getInputFiles() {
6567
return inputFiles;
6668
}
6769

70+
public CheckFileAssignmentsRequest createInputFileAssignmentsCheck() {
71+
return CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(jobId, inputFiles, partitionId);
72+
}
73+
6874
/**
6975
* Checks that there are no duplicate entries present in the list of files.
7076
*

java/compaction/compaction-core/src/main/java/sleeper/compaction/core/task/StateStoreWaitForFiles.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.time.Duration;
3737
import java.time.Instant;
38+
import java.util.List;
3839
import java.util.function.Supplier;
3940

4041
import static sleeper.compaction.core.job.status.CompactionJobFailedEvent.compactionJobFailed;
@@ -112,7 +113,7 @@ private boolean allFilesAssignedToJob(
112113
try {
113114
DynamoDBUtils.retryOnThrottlingException(throttlingRetries, () -> {
114115
try {
115-
result.set(stateStore.isPartitionFilesAssignedToJob(job.getPartitionId(), job.getInputFiles(), job.getId()));
116+
result.set(stateStore.isAssigned(List.of(job.createInputFileAssignmentsCheck())));
116117
} catch (StateStoreException e) {
117118
throw new UncheckedStateStoreException(e);
118119
}

java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import sleeper.core.schema.Field;
7171
import sleeper.core.schema.Schema;
7272
import sleeper.core.schema.type.LongType;
73+
import sleeper.core.statestore.CheckFileAssignmentsRequest;
7374
import sleeper.core.statestore.FileReference;
7475
import sleeper.core.statestore.ReplaceFileReferencesRequest;
7576
import sleeper.core.statestore.StateStore;
@@ -276,7 +277,8 @@ void shouldPutMessageBackOnSQSQueueIfStateStoreUpdateFailed() throws Exception {
276277
}).when(stateStore).atomicallyReplaceFileReferencesWithNewOnes(anyList());
277278
FileReference fileReference1 = ingestFileWith100Records();
278279
FileReference fileReference2 = ingestFileWith100Records();
279-
when(stateStore.isPartitionFilesAssignedToJob("root", List.of(fileReference1.getFilename(), fileReference2.getFilename()), "job1"))
280+
when(stateStore.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
281+
"job1", List.of(fileReference1.getFilename(), fileReference2.getFilename()), "root"))))
280282
.thenReturn(true);
281283
String jobJson = sendCompactionJobForFilesGetJson("job1", "output1.parquet", fileReference1, fileReference2);
282284

@@ -301,7 +303,8 @@ void shouldMoveMessageToDLQIfStateStoreUpdateFailedTooManyTimes() throws Excepti
301303
}).when(stateStore).atomicallyReplaceFileReferencesWithNewOnes(anyList());
302304
FileReference fileReference1 = ingestFileWith100Records();
303305
FileReference fileReference2 = ingestFileWith100Records();
304-
when(stateStore.isPartitionFilesAssignedToJob("root", List.of(fileReference1.getFilename(), fileReference2.getFilename()), "job1"))
306+
when(stateStore.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
307+
"job1", List.of(fileReference1.getFilename(), fileReference2.getFilename()), "root"))))
305308
.thenReturn(true);
306309
String jobJson = sendCompactionJobForFilesGetJson("job1", "output1.parquet", fileReference1, fileReference2);
307310

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2022-2024 Crown Copyright
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package sleeper.core.statestore;
17+
18+
import java.util.List;
19+
import java.util.Objects;
20+
21+
/**
22+
* A request to check whether a job is assigned to files on a partition. Used for compaction jobs.
23+
*/
24+
public class CheckFileAssignmentsRequest {
25+
26+
private final String jobId;
27+
private final List<String> filenames;
28+
private final String partitionId;
29+
30+
private CheckFileAssignmentsRequest(String jobId, List<String> filenames, String partitionId) {
31+
this.jobId = jobId;
32+
this.partitionId = partitionId;
33+
this.filenames = filenames;
34+
}
35+
36+
/**
37+
* Creates a request to check whether a job is assigned to files on a partition.
38+
*
39+
* @param jobId the job ID
40+
* @param filenames the filenames
41+
* @param partitionId the partition ID
42+
* @return the request
43+
*/
44+
public static CheckFileAssignmentsRequest isJobAssignedToFilesOnPartition(String jobId, List<String> filenames, String partitionId) {
45+
return new CheckFileAssignmentsRequest(jobId, filenames, partitionId);
46+
}
47+
48+
public String getJobId() {
49+
return jobId;
50+
}
51+
52+
public List<String> getFilenames() {
53+
return filenames;
54+
}
55+
56+
public String getPartitionId() {
57+
return partitionId;
58+
}
59+
60+
@Override
61+
public int hashCode() {
62+
return Objects.hash(jobId, filenames, partitionId);
63+
}
64+
65+
@Override
66+
public boolean equals(Object obj) {
67+
if (this == obj) {
68+
return true;
69+
}
70+
if (!(obj instanceof CheckFileAssignmentsRequest)) {
71+
return false;
72+
}
73+
CheckFileAssignmentsRequest other = (CheckFileAssignmentsRequest) obj;
74+
return Objects.equals(jobId, other.jobId) && Objects.equals(filenames, other.filenames) && Objects.equals(partitionId, other.partitionId);
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return "CheckFileAssignmentsRequest{jobId=" + jobId + ", filenames=" + filenames + ", partitionId=" + partitionId + "}";
80+
}
81+
}

java/core/src/main/java/sleeper/core/statestore/FileReferenceStoreQueries.java

+18-17
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.Objects;
2726
import java.util.stream.Stream;
2827

29-
import static java.util.stream.Collectors.toMap;
28+
import static java.util.stream.Collectors.groupingBy;
3029

3130
/**
3231
* Serves queries about the data files in a Sleeper table. This includes a count of the number of references
@@ -66,26 +65,28 @@ public interface FileReferenceStoreQueries {
6665
List<FileReference> getFileReferencesWithNoJobId() throws StateStoreException;
6766

6867
/**
69-
* Checks if files on a given partition are assigned to a certain job.
68+
* Checks if files are assigned to jobs.
7069
*
71-
* @param partitionId the ID of the partition to query
72-
* @return a list of {@link FileReference}s on the partition
70+
* @param requests the definitions of which files should be assigned to which jobs
71+
* @return true if all files are assigned to all jobs
7372
* @throws StateStoreException if query fails
7473
*/
75-
default boolean isPartitionFilesAssignedToJob(String partitionId, List<String> filenames, String jobId) throws StateStoreException {
74+
default boolean isAssigned(List<CheckFileAssignmentsRequest> requests) throws StateStoreException {
7675
List<FileReference> fileReferences = getFileReferences();
77-
Map<String, FileReference> partitionFileByName = fileReferences.stream()
78-
.filter(reference -> Objects.equals(partitionId, reference.getPartitionId()))
79-
.collect(toMap(FileReference::getFilename, f -> f));
76+
Map<String, List<FileReference>> referencesByPartitionId = fileReferences.stream()
77+
.collect(groupingBy(FileReference::getPartitionId));
8078
boolean allAssigned = true;
81-
for (String filename : filenames) {
82-
FileReference reference = partitionFileByName.get(filename);
83-
if (reference == null) {
84-
throw new FileReferenceNotFoundException(filename, partitionId);
85-
} else if (reference.getJobId() == null) {
86-
allAssigned = false;
87-
} else if (!reference.getJobId().equals(jobId)) {
88-
throw new FileReferenceAssignedToJobException(reference);
79+
for (CheckFileAssignmentsRequest request : requests) {
80+
List<FileReference> references = referencesByPartitionId.getOrDefault(request.getPartitionId(), List.of());
81+
for (String filename : request.getFilenames()) {
82+
FileReference reference = references.stream().filter(ref -> filename.equals(ref.getFilename())).findFirst().orElse(null);
83+
if (reference == null) {
84+
throw new FileReferenceNotFoundException(filename, request.getPartitionId());
85+
} else if (reference.getJobId() == null) {
86+
allAssigned = false;
87+
} else if (!reference.getJobId().equals(request.getJobId())) {
88+
throw new FileReferenceAssignedToJobException(reference);
89+
}
8990
}
9091
}
9192
return allAssigned;

java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryFileReferenceStoreTest.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import sleeper.core.schema.type.LongType;
2424
import sleeper.core.statestore.AllReferencesToAFile;
2525
import sleeper.core.statestore.AllReferencesToAllFiles;
26+
import sleeper.core.statestore.CheckFileAssignmentsRequest;
2627
import sleeper.core.statestore.FileReference;
2728
import sleeper.core.statestore.SplitFileReferenceRequest;
2829
import sleeper.core.statestore.SplitFileReferences;
@@ -597,7 +598,8 @@ void shouldFilesNotYetAssigned() throws Exception {
597598
store.addFiles(List.of(file1, file2));
598599

599600
// When / Then
600-
assertThat(store.isPartitionFilesAssignedToJob("root", List.of("file1", "file2"), "test-job"))
601+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
602+
"test-job", List.of("file1", "file2"), "root"))))
601603
.isFalse();
602604
}
603605

@@ -610,7 +612,8 @@ void shouldCheckAllFilesAssigned() throws Exception {
610612
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1", "file2"))));
611613

612614
// When / Then
613-
assertThat(store.isPartitionFilesAssignedToJob("root", List.of("file1", "file2"), "test-job"))
615+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
616+
"test-job", List.of("file1", "file2"), "root"))))
614617
.isTrue();
615618
}
616619

@@ -623,7 +626,8 @@ void shouldCheckSomeFilesAssigned() throws Exception {
623626
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1"))));
624627

625628
// When / Then
626-
assertThat(store.isPartitionFilesAssignedToJob("root", List.of("file1", "file2"), "test-job"))
629+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
630+
"test-job", List.of("file1", "file2"), "root"))))
627631
.isFalse();
628632
}
629633

@@ -641,16 +645,19 @@ void shouldCheckFilesAssignedOnOnePartition() throws Exception {
641645
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "L", List.of("file1", "file2"))));
642646

643647
// When / Then
644-
assertThat(store.isPartitionFilesAssignedToJob("R", List.of("file1", "file2"), "test-job"))
648+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
649+
"test-job", List.of("file1", "file2"), "R"))))
645650
.isFalse();
646-
assertThat(store.isPartitionFilesAssignedToJob("L", List.of("file1", "file2"), "test-job"))
651+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
652+
"test-job", List.of("file1", "file2"), "L"))))
647653
.isTrue();
648654
}
649655

650656
@Test
651657
void shouldFailIfFileDoesNotExist() {
652658
// When / Then
653-
assertThatThrownBy(() -> store.isPartitionFilesAssignedToJob("root", List.of("file"), "test-job"))
659+
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
660+
"test-job", List.of("file"), "root"))))
654661
.isInstanceOf(FileReferenceNotFoundException.class);
655662
}
656663

@@ -661,7 +668,8 @@ void shouldFailIfFileDoesNotExistOnPartition() throws Exception {
661668
store.addFile(factory.partitionFile("L", "file", 100L));
662669

663670
// When / Then
664-
assertThatThrownBy(() -> store.isPartitionFilesAssignedToJob("R", List.of("file"), "test-job"))
671+
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
672+
"test-job", List.of("file"), "R"))))
665673
.isInstanceOf(FileReferenceNotFoundException.class);
666674
}
667675

@@ -672,7 +680,8 @@ void shouldFailIfFileAssignedToOtherJob() throws Exception {
672680
store.assignJobIds(List.of(assignJobOnPartitionToFiles("A", "root", List.of("file"))));
673681

674682
// When / Then
675-
assertThatThrownBy(() -> store.isPartitionFilesAssignedToJob("root", List.of("file"), "B"))
683+
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
684+
"B", List.of("file"), "root"))))
676685
.isInstanceOf(FileReferenceAssignedToJobException.class);
677686
}
678687

@@ -682,7 +691,8 @@ void shouldFailIfOneFileDoesNotExist() throws Exception {
682691
store.addFile(factory.rootFile("file1", 100L));
683692

684693
// When / Then
685-
assertThatThrownBy(() -> store.isPartitionFilesAssignedToJob("root", List.of("file1", "file2"), "test-job"))
694+
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
695+
"test-job", List.of("file1", "file2"), "root"))))
686696
.isInstanceOf(FileReferenceNotFoundException.class);
687697
}
688698
}

java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import sleeper.core.schema.type.LongType;
2424
import sleeper.core.statestore.AllReferencesToAFile;
2525
import sleeper.core.statestore.AllReferencesToAllFiles;
26+
import sleeper.core.statestore.CheckFileAssignmentsRequest;
2627
import sleeper.core.statestore.FileReference;
2728
import sleeper.core.statestore.SplitFileReferenceRequest;
2829
import sleeper.core.statestore.SplitFileReferences;
@@ -597,7 +598,8 @@ void shouldFilesNotYetAssigned() throws Exception {
597598
store.addFiles(List.of(file1, file2));
598599

599600
// When / Then
600-
assertThat(store.isPartitionFilesAssignedToJob("root", List.of("file1", "file2"), "test-job"))
601+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
602+
"test-job", List.of("file1", "file2"), "root"))))
601603
.isFalse();
602604
}
603605

@@ -610,7 +612,8 @@ void shouldCheckAllFilesAssigned() throws Exception {
610612
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1", "file2"))));
611613

612614
// When / Then
613-
assertThat(store.isPartitionFilesAssignedToJob("root", List.of("file1", "file2"), "test-job"))
615+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
616+
"test-job", List.of("file1", "file2"), "root"))))
614617
.isTrue();
615618
}
616619

@@ -623,7 +626,8 @@ void shouldCheckSomeFilesAssigned() throws Exception {
623626
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1"))));
624627

625628
// When / Then
626-
assertThat(store.isPartitionFilesAssignedToJob("root", List.of("file1", "file2"), "test-job"))
629+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
630+
"test-job", List.of("file1", "file2"), "root"))))
627631
.isFalse();
628632
}
629633

@@ -641,16 +645,19 @@ void shouldCheckFilesAssignedOnOnePartition() throws Exception {
641645
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "L", List.of("file1", "file2"))));
642646

643647
// When / Then
644-
assertThat(store.isPartitionFilesAssignedToJob("R", List.of("file1", "file2"), "test-job"))
648+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
649+
"test-job", List.of("file1", "file2"), "R"))))
645650
.isFalse();
646-
assertThat(store.isPartitionFilesAssignedToJob("L", List.of("file1", "file2"), "test-job"))
651+
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
652+
"test-job", List.of("file1", "file2"), "L"))))
647653
.isTrue();
648654
}
649655

650656
@Test
651657
void shouldFailIfFileDoesNotExist() {
652658
// When / Then
653-
assertThatThrownBy(() -> store.isPartitionFilesAssignedToJob("root", List.of("file"), "test-job"))
659+
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
660+
"test-job", List.of("file"), "root"))))
654661
.isInstanceOf(FileReferenceNotFoundException.class);
655662
}
656663

@@ -661,7 +668,8 @@ void shouldFailIfFileDoesNotExistOnPartition() throws Exception {
661668
store.addFile(factory.partitionFile("L", "file", 100L));
662669

663670
// When / Then
664-
assertThatThrownBy(() -> store.isPartitionFilesAssignedToJob("R", List.of("file"), "test-job"))
671+
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
672+
"test-job", List.of("file"), "R"))))
665673
.isInstanceOf(FileReferenceNotFoundException.class);
666674
}
667675

@@ -672,7 +680,8 @@ void shouldFailIfFileAssignedToOtherJob() throws Exception {
672680
store.assignJobIds(List.of(assignJobOnPartitionToFiles("A", "root", List.of("file"))));
673681

674682
// When / Then
675-
assertThatThrownBy(() -> store.isPartitionFilesAssignedToJob("root", List.of("file"), "B"))
683+
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
684+
"B", List.of("file"), "root"))))
676685
.isInstanceOf(FileReferenceAssignedToJobException.class);
677686
}
678687

@@ -682,7 +691,8 @@ void shouldFailIfOneFileDoesNotExist() throws Exception {
682691
store.addFile(factory.rootFile("file1", 100L));
683692

684693
// When / Then
685-
assertThatThrownBy(() -> store.isPartitionFilesAssignedToJob("root", List.of("file1", "file2"), "test-job"))
694+
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
695+
"test-job", List.of("file1", "file2"), "root"))))
686696
.isInstanceOf(FileReferenceNotFoundException.class);
687697
}
688698
}

0 commit comments

Comments
 (0)