From 3ef7f322e66a670eef84af7aa68a3c36289cb8f7 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:52:32 +0000 Subject: [PATCH 01/20] Add more files to GarbageCollectionST --- .../dsl/gc/GarbageCollectionTest.java | 36 +-- ...geCollectFilesAfterCompaction.approved.txt | 206 +++++++++++++++++- .../systemtest/suite/GarbageCollectionST.java | 30 ++- ...geCollectFilesAfterCompaction.approved.txt | 206 +++++++++++++++++- 4 files changed, 447 insertions(+), 31 deletions(-) diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java index b077926b54..f3f42bbf35 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java @@ -23,13 +23,19 @@ import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues; import sleeper.systemtest.dsl.extension.AfterTestReports; +import sleeper.systemtest.dsl.ingest.SystemTestDirectIngest; import sleeper.systemtest.dsl.sourcedata.RecordNumbers; import sleeper.systemtest.dsl.testutil.InMemoryDslTest; -import sleeper.systemtest.dsl.testutil.InMemorySystemTestDrivers; +import java.nio.file.Path; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; import java.util.stream.LongStream; +import java.util.stream.StreamSupport; +import static java.util.stream.Collectors.toSet; import static org.assertj.core.api.Assertions.assertThat; import static sleeper.core.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; import static sleeper.core.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS; @@ -39,6 +45,7 @@ @InMemoryDslTest public class GarbageCollectionTest { + private Path tempDir; @BeforeEach void setUp(SleeperSystemTest sleeper, AfterTestReports reporting, AfterTestPurgeQueues purgeQueues) { @@ -46,28 +53,29 @@ void setUp(SleeperSystemTest sleeper, AfterTestReports reporting, AfterTestPurge } @Test - void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper, InMemorySystemTestDrivers drivers) { + void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) { // Given sleeper.updateTableProperties(Map.of( COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), - COMPACTION_FILES_BATCH_SIZE, "5", + COMPACTION_FILES_BATCH_SIZE, "10", GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "0")); - RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 50)); - sleeper.ingest().direct(null) - .numberedRecords(numbers.range(0, 10)) - .numberedRecords(numbers.range(10, 20)) - .numberedRecords(numbers.range(20, 30)) - .numberedRecords(numbers.range(30, 40)) - .numberedRecords(numbers.range(40, 50)); - sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs(); + RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 20_000)); + SystemTestDirectIngest ingest = sleeper.ingest().direct(tempDir); + IntStream.range(0, 2000) + .mapToObj(i -> numbers.range(i * 10, i * 10 + 10)) + .forEach(range -> ingest.numberedRecords(range)); + sleeper.compaction().createJobs(200).invokeTasks(1).waitForJobs(); // When sleeper.garbageCollection().invoke().waitFor(); // Then - assertThat(sleeper.directQuery().allRecordsInTable()) - .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 50))); + assertThat(new HashSet<>(sleeper.directQuery().allRecordsInTable())) + .isEqualTo(setFrom(sleeper.generateNumberedRecords(LongStream.range(0, 20_000)))); Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); - assertThat(drivers.data().files()).hasSize(1); + } + + private static Set setFrom(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(toSet()); } } diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.shouldGarbageCollectFilesAfterCompaction.approved.txt b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.shouldGarbageCollectFilesAfterCompaction.approved.txt index acb9e171e4..95c9584864 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.shouldGarbageCollectFilesAfterCompaction.approved.txt +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.shouldGarbageCollectFilesAfterCompaction.approved.txt @@ -1,4 +1,204 @@ Unreferenced files: 0 -Referenced files: 1 -File references: 1 -Partition at root: 50 records in file 1 +Referenced files: 200 +File references: 200 +Partition at root: +100 records in file 1 +100 records in file 2 +100 records in file 3 +100 records in file 4 +100 records in file 5 +100 records in file 6 +100 records in file 7 +100 records in file 8 +100 records in file 9 +100 records in file 10 +100 records in file 11 +100 records in file 12 +100 records in file 13 +100 records in file 14 +100 records in file 15 +100 records in file 16 +100 records in file 17 +100 records in file 18 +100 records in file 19 +100 records in file 20 +100 records in file 21 +100 records in file 22 +100 records in file 23 +100 records in file 24 +100 records in file 25 +100 records in file 26 +100 records in file 27 +100 records in file 28 +100 records in file 29 +100 records in file 30 +100 records in file 31 +100 records in file 32 +100 records in file 33 +100 records in file 34 +100 records in file 35 +100 records in file 36 +100 records in file 37 +100 records in file 38 +100 records in file 39 +100 records in file 40 +100 records in file 41 +100 records in file 42 +100 records in file 43 +100 records in file 44 +100 records in file 45 +100 records in file 46 +100 records in file 47 +100 records in file 48 +100 records in file 49 +100 records in file 50 +100 records in file 51 +100 records in file 52 +100 records in file 53 +100 records in file 54 +100 records in file 55 +100 records in file 56 +100 records in file 57 +100 records in file 58 +100 records in file 59 +100 records in file 60 +100 records in file 61 +100 records in file 62 +100 records in file 63 +100 records in file 64 +100 records in file 65 +100 records in file 66 +100 records in file 67 +100 records in file 68 +100 records in file 69 +100 records in file 70 +100 records in file 71 +100 records in file 72 +100 records in file 73 +100 records in file 74 +100 records in file 75 +100 records in file 76 +100 records in file 77 +100 records in file 78 +100 records in file 79 +100 records in file 80 +100 records in file 81 +100 records in file 82 +100 records in file 83 +100 records in file 84 +100 records in file 85 +100 records in file 86 +100 records in file 87 +100 records in file 88 +100 records in file 89 +100 records in file 90 +100 records in file 91 +100 records in file 92 +100 records in file 93 +100 records in file 94 +100 records in file 95 +100 records in file 96 +100 records in file 97 +100 records in file 98 +100 records in file 99 +100 records in file 100 +100 records in file 101 +100 records in file 102 +100 records in file 103 +100 records in file 104 +100 records in file 105 +100 records in file 106 +100 records in file 107 +100 records in file 108 +100 records in file 109 +100 records in file 110 +100 records in file 111 +100 records in file 112 +100 records in file 113 +100 records in file 114 +100 records in file 115 +100 records in file 116 +100 records in file 117 +100 records in file 118 +100 records in file 119 +100 records in file 120 +100 records in file 121 +100 records in file 122 +100 records in file 123 +100 records in file 124 +100 records in file 125 +100 records in file 126 +100 records in file 127 +100 records in file 128 +100 records in file 129 +100 records in file 130 +100 records in file 131 +100 records in file 132 +100 records in file 133 +100 records in file 134 +100 records in file 135 +100 records in file 136 +100 records in file 137 +100 records in file 138 +100 records in file 139 +100 records in file 140 +100 records in file 141 +100 records in file 142 +100 records in file 143 +100 records in file 144 +100 records in file 145 +100 records in file 146 +100 records in file 147 +100 records in file 148 +100 records in file 149 +100 records in file 150 +100 records in file 151 +100 records in file 152 +100 records in file 153 +100 records in file 154 +100 records in file 155 +100 records in file 156 +100 records in file 157 +100 records in file 158 +100 records in file 159 +100 records in file 160 +100 records in file 161 +100 records in file 162 +100 records in file 163 +100 records in file 164 +100 records in file 165 +100 records in file 166 +100 records in file 167 +100 records in file 168 +100 records in file 169 +100 records in file 170 +100 records in file 171 +100 records in file 172 +100 records in file 173 +100 records in file 174 +100 records in file 175 +100 records in file 176 +100 records in file 177 +100 records in file 178 +100 records in file 179 +100 records in file 180 +100 records in file 181 +100 records in file 182 +100 records in file 183 +100 records in file 184 +100 records in file 185 +100 records in file 186 +100 records in file 187 +100 records in file 188 +100 records in file 189 +100 records in file 190 +100 records in file 191 +100 records in file 192 +100 records in file 193 +100 records in file 194 +100 records in file 195 +100 records in file 196 +100 records in file 197 +100 records in file 198 +100 records in file 199 +100 records in file 200 diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java index 84ff8a1392..541d6ce1e6 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java @@ -25,14 +25,20 @@ import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues; import sleeper.systemtest.dsl.extension.AfterTestReports; +import sleeper.systemtest.dsl.ingest.SystemTestDirectIngest; import sleeper.systemtest.dsl.reporting.SystemTestReports; import sleeper.systemtest.dsl.sourcedata.RecordNumbers; import sleeper.systemtest.suite.testutil.SystemTest; import java.nio.file.Path; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; import java.util.stream.LongStream; +import java.util.stream.StreamSupport; +import static java.util.stream.Collectors.toSet; import static org.assertj.core.api.Assertions.assertThat; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL; import static sleeper.core.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; @@ -58,23 +64,25 @@ void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) { // Given sleeper.updateTableProperties(Map.of( COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), - COMPACTION_FILES_BATCH_SIZE, "5", + COMPACTION_FILES_BATCH_SIZE, "10", GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "0")); - RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 50)); - sleeper.ingest().direct(tempDir) - .numberedRecords(numbers.range(0, 10)) - .numberedRecords(numbers.range(10, 20)) - .numberedRecords(numbers.range(20, 30)) - .numberedRecords(numbers.range(30, 40)) - .numberedRecords(numbers.range(40, 50)); - sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs(); + RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 20_000)); + SystemTestDirectIngest ingest = sleeper.ingest().direct(tempDir); + IntStream.range(0, 2000) + .mapToObj(i -> numbers.range(i * 10, i * 10 + 10)) + .forEach(range -> ingest.numberedRecords(range)); + sleeper.compaction().createJobs(200).invokeTasks(1).waitForJobs(); // When sleeper.garbageCollection().invoke().waitFor(); // Then - assertThat(sleeper.directQuery().allRecordsInTable()) - .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 50))); + assertThat(new HashSet<>(sleeper.directQuery().allRecordsInTable())) + .isEqualTo(setFrom(sleeper.generateNumberedRecords(LongStream.range(0, 20_000)))); Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); } + + private static Set setFrom(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(toSet()); + } } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/GarbageCollectionST.shouldGarbageCollectFilesAfterCompaction.approved.txt b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/GarbageCollectionST.shouldGarbageCollectFilesAfterCompaction.approved.txt index acb9e171e4..95c9584864 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/GarbageCollectionST.shouldGarbageCollectFilesAfterCompaction.approved.txt +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/GarbageCollectionST.shouldGarbageCollectFilesAfterCompaction.approved.txt @@ -1,4 +1,204 @@ Unreferenced files: 0 -Referenced files: 1 -File references: 1 -Partition at root: 50 records in file 1 +Referenced files: 200 +File references: 200 +Partition at root: +100 records in file 1 +100 records in file 2 +100 records in file 3 +100 records in file 4 +100 records in file 5 +100 records in file 6 +100 records in file 7 +100 records in file 8 +100 records in file 9 +100 records in file 10 +100 records in file 11 +100 records in file 12 +100 records in file 13 +100 records in file 14 +100 records in file 15 +100 records in file 16 +100 records in file 17 +100 records in file 18 +100 records in file 19 +100 records in file 20 +100 records in file 21 +100 records in file 22 +100 records in file 23 +100 records in file 24 +100 records in file 25 +100 records in file 26 +100 records in file 27 +100 records in file 28 +100 records in file 29 +100 records in file 30 +100 records in file 31 +100 records in file 32 +100 records in file 33 +100 records in file 34 +100 records in file 35 +100 records in file 36 +100 records in file 37 +100 records in file 38 +100 records in file 39 +100 records in file 40 +100 records in file 41 +100 records in file 42 +100 records in file 43 +100 records in file 44 +100 records in file 45 +100 records in file 46 +100 records in file 47 +100 records in file 48 +100 records in file 49 +100 records in file 50 +100 records in file 51 +100 records in file 52 +100 records in file 53 +100 records in file 54 +100 records in file 55 +100 records in file 56 +100 records in file 57 +100 records in file 58 +100 records in file 59 +100 records in file 60 +100 records in file 61 +100 records in file 62 +100 records in file 63 +100 records in file 64 +100 records in file 65 +100 records in file 66 +100 records in file 67 +100 records in file 68 +100 records in file 69 +100 records in file 70 +100 records in file 71 +100 records in file 72 +100 records in file 73 +100 records in file 74 +100 records in file 75 +100 records in file 76 +100 records in file 77 +100 records in file 78 +100 records in file 79 +100 records in file 80 +100 records in file 81 +100 records in file 82 +100 records in file 83 +100 records in file 84 +100 records in file 85 +100 records in file 86 +100 records in file 87 +100 records in file 88 +100 records in file 89 +100 records in file 90 +100 records in file 91 +100 records in file 92 +100 records in file 93 +100 records in file 94 +100 records in file 95 +100 records in file 96 +100 records in file 97 +100 records in file 98 +100 records in file 99 +100 records in file 100 +100 records in file 101 +100 records in file 102 +100 records in file 103 +100 records in file 104 +100 records in file 105 +100 records in file 106 +100 records in file 107 +100 records in file 108 +100 records in file 109 +100 records in file 110 +100 records in file 111 +100 records in file 112 +100 records in file 113 +100 records in file 114 +100 records in file 115 +100 records in file 116 +100 records in file 117 +100 records in file 118 +100 records in file 119 +100 records in file 120 +100 records in file 121 +100 records in file 122 +100 records in file 123 +100 records in file 124 +100 records in file 125 +100 records in file 126 +100 records in file 127 +100 records in file 128 +100 records in file 129 +100 records in file 130 +100 records in file 131 +100 records in file 132 +100 records in file 133 +100 records in file 134 +100 records in file 135 +100 records in file 136 +100 records in file 137 +100 records in file 138 +100 records in file 139 +100 records in file 140 +100 records in file 141 +100 records in file 142 +100 records in file 143 +100 records in file 144 +100 records in file 145 +100 records in file 146 +100 records in file 147 +100 records in file 148 +100 records in file 149 +100 records in file 150 +100 records in file 151 +100 records in file 152 +100 records in file 153 +100 records in file 154 +100 records in file 155 +100 records in file 156 +100 records in file 157 +100 records in file 158 +100 records in file 159 +100 records in file 160 +100 records in file 161 +100 records in file 162 +100 records in file 163 +100 records in file 164 +100 records in file 165 +100 records in file 166 +100 records in file 167 +100 records in file 168 +100 records in file 169 +100 records in file 170 +100 records in file 171 +100 records in file 172 +100 records in file 173 +100 records in file 174 +100 records in file 175 +100 records in file 176 +100 records in file 177 +100 records in file 178 +100 records in file 179 +100 records in file 180 +100 records in file 181 +100 records in file 182 +100 records in file 183 +100 records in file 184 +100 records in file 185 +100 records in file 186 +100 records in file 187 +100 records in file 188 +100 records in file 189 +100 records in file 190 +100 records in file 191 +100 records in file 192 +100 records in file 193 +100 records in file 194 +100 records in file 195 +100 records in file 196 +100 records in file 197 +100 records in file 198 +100 records in file 199 +100 records in file 200 From 1e1e911ce4e47cc28a024cf04aab0f8d1f0078e1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Nov 2024 04:01:49 +0000 Subject: [PATCH 02/20] Bump thiserror from 2.0.0 to 2.0.3 in /rust Bumps [thiserror](https://github.com/dtolnay/thiserror) from 2.0.0 to 2.0.3. - [Release notes](https://github.com/dtolnay/thiserror/releases) - [Commits](https://github.com/dtolnay/thiserror/compare/2.0.0...2.0.3) --- updated-dependencies: - dependency-name: thiserror dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- rust/Cargo.lock | 12 ++++++------ rust/compactor/Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d32011203e..f8fdd939f7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1065,7 +1065,7 @@ dependencies = [ "log", "num-format", "owo-colors 4.1.0", - "thiserror 2.0.0", + "thiserror 2.0.3", "tokio", "url", ] @@ -3804,11 +3804,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.0" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15291287e9bff1bc6f9ff3409ed9af665bec7a5fc8ac079ea96be07bca0e2668" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" dependencies = [ - "thiserror-impl 2.0.0", + "thiserror-impl 2.0.3", ] [[package]] @@ -3824,9 +3824,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.0" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22efd00f33f93fa62848a7cab956c3d38c8d43095efda1decfc2b3a5dc0b8972" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", diff --git a/rust/compactor/Cargo.toml b/rust/compactor/Cargo.toml index 2cc0527ea7..753e3ef709 100644 --- a/rust/compactor/Cargo.toml +++ b/rust/compactor/Cargo.toml @@ -27,7 +27,7 @@ default-run = "main" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -thiserror = { version = "2.0.0" } # Derivable error enums +thiserror = { version = "2.0.3" } # Derivable error enums log = { version = "0.4.22", features = [ "release_max_level_debug", ] } # Standard logging framework From 0b22071b3c25f60ae921b0c0209576792428ae12 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Mon, 11 Nov 2024 09:25:53 +0000 Subject: [PATCH 03/20] Raise timeout for GC in GarbageCollectionST --- .../dsl/gc/SystemTestGarbageCollection.java | 9 ++++++--- .../systemtest/dsl/util/PollWithRetriesDriver.java | 14 ++++++++------ .../systemtest/suite/GarbageCollectionST.java | 5 ++++- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/SystemTestGarbageCollection.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/SystemTestGarbageCollection.java index 89c074d10a..14bf465b54 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/SystemTestGarbageCollection.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/SystemTestGarbageCollection.java @@ -16,6 +16,7 @@ package sleeper.systemtest.dsl.gc; +import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SystemTestContext; import sleeper.systemtest.dsl.SystemTestDrivers; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; @@ -42,8 +43,10 @@ public SystemTestGarbageCollection invoke() { } public void waitFor() { - WaitForGC.waitUntilNoUnreferencedFiles(instance, - pollDriver.pollWithIntervalAndTimeout( - Duration.ofSeconds(5), Duration.ofSeconds(60))); + waitFor(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(5), Duration.ofMinutes(1))); + } + + public void waitFor(PollWithRetries poll) { + WaitForGC.waitUntilNoUnreferencedFiles(instance, pollDriver.poll(poll)); } } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/util/PollWithRetriesDriver.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/util/PollWithRetriesDriver.java index 93fcfd414d..e86025cee9 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/util/PollWithRetriesDriver.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/util/PollWithRetriesDriver.java @@ -22,18 +22,20 @@ @FunctionalInterface public interface PollWithRetriesDriver { - PollWithRetries pollWithIntervalAndTimeout(Duration pollInterval, Duration timeout); + PollWithRetries poll(PollWithRetries config); + + default PollWithRetries pollWithIntervalAndTimeout(Duration pollInterval, Duration timeout) { + return poll(PollWithRetries.intervalAndPollingTimeout(pollInterval, timeout)); + } static PollWithRetriesDriver realWaits() { - return PollWithRetries::intervalAndPollingTimeout; + return poll -> poll; } static PollWithRetriesDriver noWaits() { - return (pollInterval, timeout) -> PollWithRetries.builder() - .pollIntervalAndTimeout(pollInterval, timeout) + return poll -> poll.toBuilder() .sleepInInterval(millis -> { // Do not really wait - }) - .build(); + }).build(); } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java index 541d6ce1e6..a58d06fba3 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.io.TempDir; import sleeper.compaction.core.strategy.impl.BasicCompactionStrategy; +import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues; import sleeper.systemtest.dsl.extension.AfterTestReports; @@ -31,6 +32,7 @@ import sleeper.systemtest.suite.testutil.SystemTest; import java.nio.file.Path; +import java.time.Duration; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -74,7 +76,8 @@ void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) { sleeper.compaction().createJobs(200).invokeTasks(1).waitForJobs(); // When - sleeper.garbageCollection().invoke().waitFor(); + sleeper.garbageCollection().invoke().waitFor( + PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(5))); // Then assertThat(new HashSet<>(sleeper.directQuery().allRecordsInTable())) From 165790dcac5b03a2ba8e245d7dde6f7614c24ca5 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Mon, 11 Nov 2024 10:23:26 +0000 Subject: [PATCH 04/20] Pre-split partitions in GarbageCollectionST --- .../dsl/gc/GarbageCollectionTest.java | 35 ++- ...geCollectFilesAfterCompaction.approved.txt | 204 ------------------ .../dsl}/testutil/PartitionsTestHelper.java | 6 +- .../drivers/InMemoryDirectIngestDriver.java | 2 +- .../drivers/InMemoryPartitionFileWriter.java | 12 +- .../testutil/drivers/InMemoryRecordBatch.java | 13 ++ .../systemtest/suite/EksBulkImportST.java | 2 +- .../suite/EmrBulkImportPerformanceST.java | 2 +- .../suite/EmrPersistentBulkImportST.java | 2 +- .../systemtest/suite/GarbageCollectionST.java | 30 ++- .../systemtest/suite/IngestPerformanceST.java | 2 +- .../suite/ParallelCompactionsST.java | 2 +- 12 files changed, 83 insertions(+), 229 deletions(-) delete mode 100644 java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.shouldGarbageCollectFilesAfterCompaction.approved.txt rename java/system-test/{system-test-suite/src/test/java/sleeper/systemtest/suite => system-test-dsl/src/test/java/sleeper/systemtest/dsl}/testutil/PartitionsTestHelper.java (92%) diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java index f3f42bbf35..808d4e4dd3 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java @@ -15,11 +15,12 @@ */ package sleeper.systemtest.dsl.gc; -import org.approvaltests.Approvals; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import sleeper.compaction.core.strategy.impl.BasicCompactionStrategy; +import sleeper.core.properties.validation.IngestFileWritingStrategy; +import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues; import sleeper.systemtest.dsl.extension.AfterTestReports; @@ -28,9 +29,11 @@ import sleeper.systemtest.dsl.testutil.InMemoryDslTest; import java.nio.file.Path; +import java.time.Duration; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.StreamSupport; @@ -40,8 +43,13 @@ import static sleeper.core.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; import static sleeper.core.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS; import static sleeper.core.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION; -import static sleeper.core.testutils.printers.FileReferencePrinter.printFiles; +import static sleeper.core.properties.table.TableProperty.INGEST_FILE_WRITING_STRATEGY; +import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix; +import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo; +import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField; import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.MAIN; +import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.ROW_KEY_FIELD_NAME; +import static sleeper.systemtest.dsl.testutil.PartitionsTestHelper.partitionsBuilder; @InMemoryDslTest public class GarbageCollectionTest { @@ -55,24 +63,35 @@ void setUp(SleeperSystemTest sleeper, AfterTestReports reporting, AfterTestPurge @Test void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) { // Given + sleeper.setGeneratorOverrides(overrideField(ROW_KEY_FIELD_NAME, + numberStringAndZeroPadTo(5).then(addPrefix("row-")))); + sleeper.partitioning().setPartitions(partitionsBuilder(sleeper) + .rootFirst("root") + .splitToNewChildren("root", UUID.randomUUID().toString(), UUID.randomUUID().toString(), "row-50000") + .buildTree()); sleeper.updateTableProperties(Map.of( + INGEST_FILE_WRITING_STRATEGY, IngestFileWritingStrategy.ONE_FILE_PER_LEAF.toString(), COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), COMPACTION_FILES_BATCH_SIZE, "10", GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "0")); - RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 20_000)); + RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 100_000)); SystemTestDirectIngest ingest = sleeper.ingest().direct(tempDir); - IntStream.range(0, 2000) - .mapToObj(i -> numbers.range(i * 10, i * 10 + 10)) + IntStream.range(0, 1000) + .mapToObj(i -> numbers.range(i * 100, i * 100 + 100)) .forEach(range -> ingest.numberedRecords(range)); sleeper.compaction().createJobs(200).invokeTasks(1).waitForJobs(); // When - sleeper.garbageCollection().invoke().waitFor(); + sleeper.garbageCollection().invoke().waitFor( + PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(5))); // Then assertThat(new HashSet<>(sleeper.directQuery().allRecordsInTable())) - .isEqualTo(setFrom(sleeper.generateNumberedRecords(LongStream.range(0, 20_000)))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + .isEqualTo(setFrom(sleeper.generateNumberedRecords(LongStream.range(0, 100_000)))); + assertThat(sleeper.tableFiles().all()).satisfies(files -> { + assertThat(files.getFilesWithNoReferences()).isEmpty(); + assertThat(files.listFileReferences()).hasSize(200); + }); } private static Set setFrom(Iterable iterable) { diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.shouldGarbageCollectFilesAfterCompaction.approved.txt b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.shouldGarbageCollectFilesAfterCompaction.approved.txt deleted file mode 100644 index 95c9584864..0000000000 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.shouldGarbageCollectFilesAfterCompaction.approved.txt +++ /dev/null @@ -1,204 +0,0 @@ -Unreferenced files: 0 -Referenced files: 200 -File references: 200 -Partition at root: -100 records in file 1 -100 records in file 2 -100 records in file 3 -100 records in file 4 -100 records in file 5 -100 records in file 6 -100 records in file 7 -100 records in file 8 -100 records in file 9 -100 records in file 10 -100 records in file 11 -100 records in file 12 -100 records in file 13 -100 records in file 14 -100 records in file 15 -100 records in file 16 -100 records in file 17 -100 records in file 18 -100 records in file 19 -100 records in file 20 -100 records in file 21 -100 records in file 22 -100 records in file 23 -100 records in file 24 -100 records in file 25 -100 records in file 26 -100 records in file 27 -100 records in file 28 -100 records in file 29 -100 records in file 30 -100 records in file 31 -100 records in file 32 -100 records in file 33 -100 records in file 34 -100 records in file 35 -100 records in file 36 -100 records in file 37 -100 records in file 38 -100 records in file 39 -100 records in file 40 -100 records in file 41 -100 records in file 42 -100 records in file 43 -100 records in file 44 -100 records in file 45 -100 records in file 46 -100 records in file 47 -100 records in file 48 -100 records in file 49 -100 records in file 50 -100 records in file 51 -100 records in file 52 -100 records in file 53 -100 records in file 54 -100 records in file 55 -100 records in file 56 -100 records in file 57 -100 records in file 58 -100 records in file 59 -100 records in file 60 -100 records in file 61 -100 records in file 62 -100 records in file 63 -100 records in file 64 -100 records in file 65 -100 records in file 66 -100 records in file 67 -100 records in file 68 -100 records in file 69 -100 records in file 70 -100 records in file 71 -100 records in file 72 -100 records in file 73 -100 records in file 74 -100 records in file 75 -100 records in file 76 -100 records in file 77 -100 records in file 78 -100 records in file 79 -100 records in file 80 -100 records in file 81 -100 records in file 82 -100 records in file 83 -100 records in file 84 -100 records in file 85 -100 records in file 86 -100 records in file 87 -100 records in file 88 -100 records in file 89 -100 records in file 90 -100 records in file 91 -100 records in file 92 -100 records in file 93 -100 records in file 94 -100 records in file 95 -100 records in file 96 -100 records in file 97 -100 records in file 98 -100 records in file 99 -100 records in file 100 -100 records in file 101 -100 records in file 102 -100 records in file 103 -100 records in file 104 -100 records in file 105 -100 records in file 106 -100 records in file 107 -100 records in file 108 -100 records in file 109 -100 records in file 110 -100 records in file 111 -100 records in file 112 -100 records in file 113 -100 records in file 114 -100 records in file 115 -100 records in file 116 -100 records in file 117 -100 records in file 118 -100 records in file 119 -100 records in file 120 -100 records in file 121 -100 records in file 122 -100 records in file 123 -100 records in file 124 -100 records in file 125 -100 records in file 126 -100 records in file 127 -100 records in file 128 -100 records in file 129 -100 records in file 130 -100 records in file 131 -100 records in file 132 -100 records in file 133 -100 records in file 134 -100 records in file 135 -100 records in file 136 -100 records in file 137 -100 records in file 138 -100 records in file 139 -100 records in file 140 -100 records in file 141 -100 records in file 142 -100 records in file 143 -100 records in file 144 -100 records in file 145 -100 records in file 146 -100 records in file 147 -100 records in file 148 -100 records in file 149 -100 records in file 150 -100 records in file 151 -100 records in file 152 -100 records in file 153 -100 records in file 154 -100 records in file 155 -100 records in file 156 -100 records in file 157 -100 records in file 158 -100 records in file 159 -100 records in file 160 -100 records in file 161 -100 records in file 162 -100 records in file 163 -100 records in file 164 -100 records in file 165 -100 records in file 166 -100 records in file 167 -100 records in file 168 -100 records in file 169 -100 records in file 170 -100 records in file 171 -100 records in file 172 -100 records in file 173 -100 records in file 174 -100 records in file 175 -100 records in file 176 -100 records in file 177 -100 records in file 178 -100 records in file 179 -100 records in file 180 -100 records in file 181 -100 records in file 182 -100 records in file 183 -100 records in file 184 -100 records in file 185 -100 records in file 186 -100 records in file 187 -100 records in file 188 -100 records in file 189 -100 records in file 190 -100 records in file 191 -100 records in file 192 -100 records in file 193 -100 records in file 194 -100 records in file 195 -100 records in file 196 -100 records in file 197 -100 records in file 198 -100 records in file 199 -100 records in file 200 diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/testutil/PartitionsTestHelper.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/PartitionsTestHelper.java similarity index 92% rename from java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/testutil/PartitionsTestHelper.java rename to java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/PartitionsTestHelper.java index 5eec8f05f8..fe4cb5b014 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/testutil/PartitionsTestHelper.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/PartitionsTestHelper.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package sleeper.systemtest.suite.testutil; +package sleeper.systemtest.dsl.testutil; import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; @@ -34,6 +34,10 @@ public class PartitionsTestHelper { private PartitionsTestHelper() { } + public static PartitionTree create2StringPartitions(SleeperSystemTest sleeper) { + return createStringPartitionsFromSplitPointsDirectory(sleeper, "2-partitions.txt"); + } + public static PartitionTree create128StringPartitions(SleeperSystemTest sleeper) { return createStringPartitionsFromSplitPointsDirectory(sleeper, "128-partitions.txt"); } diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryDirectIngestDriver.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryDirectIngestDriver.java index b01de15c2d..0c23d395c3 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryDirectIngestDriver.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryDirectIngestDriver.java @@ -59,7 +59,7 @@ public IngestResult ingest( StateStore stateStore, AddFilesToStateStore addFilesToStateStore, Iterator records) { try (IngestCoordinator coordinator = IngestCoordinator.builderWith(instanceProperties, tableProperties) .objectFactory(ObjectFactory.noUserJars()) - .recordBatchFactory(InMemoryRecordBatch::new) + .recordBatchFactory(() -> new InMemoryRecordBatch(tableProperties.getSchema())) .partitionFileWriterFactory(InMemoryPartitionFileWriter.factory(data, sketches, instanceProperties, tableProperties)) .stateStore(stateStore) .addFilesToStateStore(addFilesToStateStore) diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionFileWriter.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionFileWriter.java index 9f7cb54b6c..1318509c8e 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionFileWriter.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryPartitionFileWriter.java @@ -16,12 +16,16 @@ package sleeper.systemtest.dsl.testutil.drivers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import sleeper.core.partition.Partition; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; import sleeper.core.record.Record; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; +import sleeper.core.table.TableFilePaths; import sleeper.ingest.runner.impl.partitionfilewriter.PartitionFileWriter; import sleeper.ingest.runner.impl.partitionfilewriter.PartitionFileWriterFactory; import sleeper.query.runner.recordretrieval.InMemoryDataStore; @@ -37,6 +41,7 @@ import static sleeper.core.properties.table.TableProperty.TABLE_ID; public class InMemoryPartitionFileWriter implements PartitionFileWriter { + public static final Logger LOGGER = LoggerFactory.getLogger(InMemoryPartitionFileWriter.class); private final InMemoryDataStore dataStore; private final InMemorySketchesStore sketchesStore; @@ -57,11 +62,11 @@ private InMemoryPartitionFileWriter(InMemoryDataStore dataStore, InMemorySketche public static PartitionFileWriterFactory factory( InMemoryDataStore data, InMemorySketchesStore sketches, InstanceProperties instanceProperties, TableProperties tableProperties) { - String filePathPrefix = instanceProperties.get(FILE_SYSTEM) + TableFilePaths filePaths = TableFilePaths.fromPrefix(instanceProperties.get(FILE_SYSTEM) + instanceProperties.get(DATA_BUCKET) + "/" - + tableProperties.get(TABLE_ID); + + tableProperties.get(TABLE_ID)); return partition -> new InMemoryPartitionFileWriter( - data, sketches, partition, filePathPrefix + "/" + UUID.randomUUID() + ".parquet", tableProperties.getSchema()); + data, sketches, partition, filePaths.constructPartitionParquetFilePath(partition, UUID.randomUUID().toString()), tableProperties.getSchema()); } @Override @@ -74,6 +79,7 @@ public void append(Record record) { public CompletableFuture close() { dataStore.addFile(filename, records); sketchesStore.addSketchForFile(filename, sketches); + LOGGER.info("Wrote file with {} records: {}", records.size(), filename); return CompletableFuture.completedFuture(FileReference.builder() .filename(filename) .partitionId(partition.getId()) diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryRecordBatch.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryRecordBatch.java index 1201e1f3e8..14f8797270 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryRecordBatch.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryRecordBatch.java @@ -16,18 +16,29 @@ package sleeper.systemtest.dsl.testutil.drivers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import sleeper.core.iterator.CloseableIterator; import sleeper.core.iterator.WrappedIterator; import sleeper.core.record.Record; +import sleeper.core.record.RecordComparator; +import sleeper.core.schema.Schema; import sleeper.ingest.runner.impl.recordbatch.RecordBatch; import java.util.ArrayList; import java.util.List; public class InMemoryRecordBatch implements RecordBatch { + public static final Logger LOGGER = LoggerFactory.getLogger(InMemoryRecordBatch.class); + private final Schema schema; private final List records = new ArrayList<>(); + public InMemoryRecordBatch(Schema schema) { + this.schema = schema; + } + @Override public void append(Record data) { records.add(data); @@ -40,10 +51,12 @@ public boolean isFull() { @Override public CloseableIterator createOrderedRecordIterator() { + records.sort(new RecordComparator(schema)); return new WrappedIterator<>(records.iterator()); } @Override public void close() { + LOGGER.info("Closing batch with {} records", records.size()); } } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EksBulkImportST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EksBulkImportST.java index 0667e34c91..db3dfb6347 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EksBulkImportST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EksBulkImportST.java @@ -39,8 +39,8 @@ import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix; import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo; import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField; +import static sleeper.systemtest.dsl.testutil.PartitionsTestHelper.partitionsBuilder; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.BULK_IMPORT_EKS; -import static sleeper.systemtest.suite.testutil.PartitionsTestHelper.partitionsBuilder; @SystemTest // Slow because it needs to do two CDK deployments, one to add the EKS cluster and one to remove it. diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EmrBulkImportPerformanceST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EmrBulkImportPerformanceST.java index cd6aea3150..088af849d9 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EmrBulkImportPerformanceST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EmrBulkImportPerformanceST.java @@ -34,9 +34,9 @@ import static sleeper.systemtest.configuration.SystemTestProperty.INGEST_MODE; import static sleeper.systemtest.configuration.SystemTestProperty.NUMBER_OF_RECORDS_PER_INGEST; import static sleeper.systemtest.configuration.SystemTestProperty.NUMBER_OF_WRITERS; +import static sleeper.systemtest.dsl.testutil.PartitionsTestHelper.create512StringPartitions; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.BULK_IMPORT_PERFORMANCE; import static sleeper.systemtest.suite.testutil.FileReferenceSystemTestHelper.numberOfRecordsIn; -import static sleeper.systemtest.suite.testutil.PartitionsTestHelper.create512StringPartitions; @SystemTest @Expensive // Expensive because it takes a lot of very costly EMR instances to import this many records. diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EmrPersistentBulkImportST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EmrPersistentBulkImportST.java index dc59829168..dc28a06a37 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EmrPersistentBulkImportST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/EmrPersistentBulkImportST.java @@ -37,8 +37,8 @@ import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix; import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo; import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField; +import static sleeper.systemtest.dsl.testutil.PartitionsTestHelper.partitionsBuilder; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.BULK_IMPORT_PERSISTENT_EMR; -import static sleeper.systemtest.suite.testutil.PartitionsTestHelper.partitionsBuilder; @SystemTest // Slow because it needs to do two CDK deployments, one to add the EMR cluster and one to remove it. diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java index a58d06fba3..102bda7d69 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java @@ -16,12 +16,12 @@ package sleeper.systemtest.suite; -import org.approvaltests.Approvals; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import sleeper.compaction.core.strategy.impl.BasicCompactionStrategy; +import sleeper.core.properties.validation.IngestFileWritingStrategy; import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues; @@ -36,6 +36,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.StreamSupport; @@ -46,8 +47,13 @@ import static sleeper.core.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; import static sleeper.core.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS; import static sleeper.core.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION; -import static sleeper.core.testutils.printers.FileReferencePrinter.printFiles; +import static sleeper.core.properties.table.TableProperty.INGEST_FILE_WRITING_STRATEGY; +import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix; +import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo; +import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField; +import static sleeper.systemtest.dsl.testutil.PartitionsTestHelper.partitionsBuilder; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.MAIN; +import static sleeper.systemtest.suite.fixtures.SystemTestSchema.ROW_KEY_FIELD_NAME; @SystemTest public class GarbageCollectionST { @@ -64,14 +70,21 @@ void setUp(SleeperSystemTest sleeper, AfterTestReports reporting, AfterTestPurge @Test void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) { // Given + sleeper.setGeneratorOverrides(overrideField(ROW_KEY_FIELD_NAME, + numberStringAndZeroPadTo(5).then(addPrefix("row-")))); + sleeper.partitioning().setPartitions(partitionsBuilder(sleeper) + .rootFirst("root") + .splitToNewChildren("root", UUID.randomUUID().toString(), UUID.randomUUID().toString(), "row-50000") + .buildTree()); sleeper.updateTableProperties(Map.of( + INGEST_FILE_WRITING_STRATEGY, IngestFileWritingStrategy.ONE_FILE_PER_LEAF.toString(), COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), COMPACTION_FILES_BATCH_SIZE, "10", GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "0")); - RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 20_000)); + RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 100_000)); SystemTestDirectIngest ingest = sleeper.ingest().direct(tempDir); - IntStream.range(0, 2000) - .mapToObj(i -> numbers.range(i * 10, i * 10 + 10)) + IntStream.range(0, 1000) + .mapToObj(i -> numbers.range(i * 100, i * 100 + 100)) .forEach(range -> ingest.numberedRecords(range)); sleeper.compaction().createJobs(200).invokeTasks(1).waitForJobs(); @@ -81,8 +94,11 @@ void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) { // Then assertThat(new HashSet<>(sleeper.directQuery().allRecordsInTable())) - .isEqualTo(setFrom(sleeper.generateNumberedRecords(LongStream.range(0, 20_000)))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + .isEqualTo(setFrom(sleeper.generateNumberedRecords(LongStream.range(0, 100_000)))); + assertThat(sleeper.tableFiles().all()).satisfies(files -> { + assertThat(files.getFilesWithNoReferences()).isEmpty(); + assertThat(files.listFileReferences()).hasSize(200); + }); } private static Set setFrom(Iterable iterable) { diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/IngestPerformanceST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/IngestPerformanceST.java index fb98c10908..1825a363c9 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/IngestPerformanceST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/IngestPerformanceST.java @@ -35,9 +35,9 @@ import static sleeper.systemtest.configuration.SystemTestProperty.INGEST_QUEUE; import static sleeper.systemtest.configuration.SystemTestProperty.NUMBER_OF_RECORDS_PER_INGEST; import static sleeper.systemtest.configuration.SystemTestProperty.NUMBER_OF_WRITERS; +import static sleeper.systemtest.dsl.testutil.PartitionsTestHelper.create128StringPartitions; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.INGEST_PERFORMANCE; import static sleeper.systemtest.suite.testutil.FileReferenceSystemTestHelper.numberOfRecordsIn; -import static sleeper.systemtest.suite.testutil.PartitionsTestHelper.create128StringPartitions; @SystemTest @Expensive // Expensive because it takes a long time to ingest this many records on fairly large ECS instances. diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/ParallelCompactionsST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/ParallelCompactionsST.java index e74898bbc4..f282bc29f6 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/ParallelCompactionsST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/ParallelCompactionsST.java @@ -42,8 +42,8 @@ import static sleeper.systemtest.configuration.SystemTestProperty.INGEST_MODE; import static sleeper.systemtest.configuration.SystemTestProperty.NUMBER_OF_RECORDS_PER_INGEST; import static sleeper.systemtest.configuration.SystemTestProperty.NUMBER_OF_WRITERS; +import static sleeper.systemtest.dsl.testutil.PartitionsTestHelper.create8192StringPartitions; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.PARALLEL_COMPACTIONS; -import static sleeper.systemtest.suite.testutil.PartitionsTestHelper.create8192StringPartitions; @SystemTest @Expensive From 3b134bd15a5c7a8b6748d65f13c60944043547ce Mon Sep 17 00:00:00 2001 From: rtjd6554 <174791724+rtjd6554@users.noreply.github.com> Date: Mon, 11 Nov 2024 10:26:48 +0000 Subject: [PATCH 05/20] Update for DomPurify suppression --- code-style/dependency-check-suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code-style/dependency-check-suppressions.xml b/code-style/dependency-check-suppressions.xml index fe7c90a2ad..48a0c1fbc3 100644 --- a/code-style/dependency-check-suppressions.xml +++ b/code-style/dependency-check-suppressions.xml @@ -298,7 +298,7 @@ we're using uses Jetty 9. ]]> ^pkg:javascript/DOMPurify@.*$ - CVE-2024-45801|CVE-2024-47875 + CVE-2024-45801|CVE-2024-47875|CVE-2024-48910 Date: Mon, 11 Nov 2024 10:34:47 +0000 Subject: [PATCH 06/20] Query by queue in GarbageCollectionST --- .../java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java | 2 +- .../test/java/sleeper/systemtest/suite/GarbageCollectionST.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java index 808d4e4dd3..5f36afb5a8 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/gc/GarbageCollectionTest.java @@ -86,7 +86,7 @@ void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) { PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(5))); // Then - assertThat(new HashSet<>(sleeper.directQuery().allRecordsInTable())) + assertThat(new HashSet<>(sleeper.query().byQueue().allRecordsInTable())) .isEqualTo(setFrom(sleeper.generateNumberedRecords(LongStream.range(0, 100_000)))); assertThat(sleeper.tableFiles().all()).satisfies(files -> { assertThat(files.getFilesWithNoReferences()).isEmpty(); diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java index 102bda7d69..c016e5ab4d 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/GarbageCollectionST.java @@ -93,7 +93,7 @@ void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) { PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(5))); // Then - assertThat(new HashSet<>(sleeper.directQuery().allRecordsInTable())) + assertThat(new HashSet<>(sleeper.query().byQueue().allRecordsInTable())) .isEqualTo(setFrom(sleeper.generateNumberedRecords(LongStream.range(0, 100_000)))); assertThat(sleeper.tableFiles().all()).satisfies(files -> { assertThat(files.getFilesWithNoReferences()).isEmpty(); From f287eb54a5f99d6ee72b2b974e28bbead20bc06e Mon Sep 17 00:00:00 2001 From: rtjd6554 <174791724+rtjd6554@users.noreply.github.com> Date: Mon, 11 Nov 2024 10:54:52 +0000 Subject: [PATCH 07/20] Refine jetty regex for suppression --- code-style/dependency-check-suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code-style/dependency-check-suppressions.xml b/code-style/dependency-check-suppressions.xml index 48a0c1fbc3..1b41670911 100644 --- a/code-style/dependency-check-suppressions.xml +++ b/code-style/dependency-check-suppressions.xml @@ -288,7 +288,7 @@ We're not using the HttpURI class directly, and the CVE states that Jetty is not vulnerable unless you use that directly. ]]> - ^pkg:maven/org\.eclipse\.jetty/jetty-http@.*$ + ^^pkg:maven/org\.eclipse\.jetty\*$ CVE-2024-6763 From 2f7f2642d2a61285a471cfaf38ad418e48f7700a Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:03:22 +0000 Subject: [PATCH 08/20] Extract StateStoreCommitRequestInS3Uploader --- .../StateStoreCommitRequestInS3Uploader.java | 78 +++++++++++++++++++ .../impl/commit/AddFilesToStateStore.java | 13 +--- 2 files changed, 81 insertions(+), 10 deletions(-) create mode 100644 java/core/src/main/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3Uploader.java diff --git a/java/core/src/main/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3Uploader.java b/java/core/src/main/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3Uploader.java new file mode 100644 index 0000000000..61ca3260e4 --- /dev/null +++ b/java/core/src/main/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3Uploader.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.core.statestore.commit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sleeper.core.properties.instance.InstanceProperties; + +import java.util.function.Supplier; + +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET; + +/** + * Handles uploading commit requests to S3 if they are too big to fit in an SQS message. + */ +public class StateStoreCommitRequestInS3Uploader { + public static final Logger LOGGER = LoggerFactory.getLogger(StateStoreCommitRequestInS3Uploader.class); + + private final InstanceProperties instanceProperties; + private final Client client; + private final Supplier filenameSupplier; + private final StateStoreCommitRequestInS3SerDe serDe = new StateStoreCommitRequestInS3SerDe(); + + public StateStoreCommitRequestInS3Uploader(InstanceProperties instanceProperties, Client client, Supplier filenameSupplier) { + this.instanceProperties = instanceProperties; + this.client = client; + this.filenameSupplier = filenameSupplier; + } + + /** + * Checks whether a state store commit request JSON will fit in an SQS message. If not, uploads it to S3 and creates + * a new commit request referencing the original JSON in S3. + * + * @param tableId the Sleeper table ID + * @param commitRequestJson the commit request JSON + * @return the commit request if it fits in an SQS message, or a new commit request referencing S3 + */ + public String uploadAndWrapIfTooBig(String tableId, String commitRequestJson) { + // Store in S3 if the request will not fit in an SQS message + if (commitRequestJson.length() > 262144) { + String s3Key = StateStoreCommitRequestInS3.createFileS3Key(tableId, filenameSupplier.get()); + client.putObject(instanceProperties.get(DATA_BUCKET), s3Key, commitRequestJson); + LOGGER.info("Request was too big for an SQS message. Will submit a reference to file in data bucket: {}", s3Key); + return serDe.toJson(new StateStoreCommitRequestInS3(s3Key)); + } else { + return commitRequestJson; + } + } + + /** + * A client to upload an object to S3. + */ + public interface Client { + + /** + * Uploads an object to an S3 bucket. + * + * @param bucketName the bucket name + * @param key the key to upload to + * @param content the content of the file to upload + */ + void putObject(String bucketName, String key, String content); + } +} diff --git a/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java b/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java index fb9c9cbdfd..6aecdd68bf 100644 --- a/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java +++ b/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java @@ -26,8 +26,7 @@ import sleeper.core.statestore.FileReference; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; -import sleeper.core.statestore.commit.StateStoreCommitRequestInS3; -import sleeper.core.statestore.commit.StateStoreCommitRequestInS3SerDe; +import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader; import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequest; import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequestSerDe; import sleeper.ingest.core.job.status.IngestJobAddedFilesEvent; @@ -38,7 +37,6 @@ import java.util.function.Consumer; import java.util.function.Supplier; -import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL; @FunctionalInterface @@ -75,6 +73,7 @@ static AddFilesToStateStore bySqs( Supplier s3FilenameSupplier, Consumer requestConfig) { IngestAddFilesCommitRequestSerDe serDe = new IngestAddFilesCommitRequestSerDe(); + StateStoreCommitRequestInS3Uploader s3Uploader = new StateStoreCommitRequestInS3Uploader(instanceProperties, s3Client::putObject, s3FilenameSupplier); return references -> { IngestAddFilesCommitRequest.Builder requestBuilder = IngestAddFilesCommitRequest.builder() .fileReferences(references); @@ -82,13 +81,7 @@ static AddFilesToStateStore bySqs( IngestAddFilesCommitRequest request = requestBuilder.build(); String json = serDe.toJson(request); LOGGER.debug("Sending asynchronous request to state store committer: {}", request); - // Store in S3 if the request will not fit in an SQS message - if (json.length() > 262144) { - String s3Key = StateStoreCommitRequestInS3.createFileS3Key(request.getTableId(), s3FilenameSupplier.get()); - s3Client.putObject(instanceProperties.get(DATA_BUCKET), s3Key, json); - json = new StateStoreCommitRequestInS3SerDe().toJson(new StateStoreCommitRequestInS3(s3Key)); - LOGGER.info("Request to add files was too big for an SQS message. Will submit a reference to file in data bucket: {}", s3Key); - } + json = s3Uploader.uploadAndWrapIfTooBig(request.getTableId(), json); sqsClient.sendMessage(new SendMessageRequest() .withQueueUrl(instanceProperties.get(STATESTORE_COMMITTER_QUEUE_URL)) .withMessageBody(json) From 39e154fe60f8032c86d39dbbb033206279b78447 Mon Sep 17 00:00:00 2001 From: rtjd6554 <174791724+rtjd6554@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:16:15 +0000 Subject: [PATCH 09/20] Correct typo --- code-style/dependency-check-suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code-style/dependency-check-suppressions.xml b/code-style/dependency-check-suppressions.xml index 1b41670911..2ac611607b 100644 --- a/code-style/dependency-check-suppressions.xml +++ b/code-style/dependency-check-suppressions.xml @@ -288,7 +288,7 @@ We're not using the HttpURI class directly, and the CVE states that Jetty is not vulnerable unless you use that directly. ]]> - ^^pkg:maven/org\.eclipse\.jetty\*$ + ^pkg:maven/org\.eclipse\.jetty\*$ CVE-2024-6763 From f07e1c2047e6e7500efec18afd8c077a47a73644 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:22:05 +0000 Subject: [PATCH 10/20] Use StateStoreCommitRequestInS3Uploader in GarbageCollector --- .../StateStoreCommitRequestInS3Uploader.java | 12 ++++- ...ateStoreCommitRequestInS3UploaderTest.java | 27 ++++++++++ .../garbagecollector/GarbageCollector.java | 5 +- .../GarbageCollectorLambda.java | 3 +- .../GarbageCollectorS3IT.java | 53 ++++++++++++++++++- .../impl/commit/AddFilesToStateStore.java | 8 ++- .../impl/commit/AddFilesToStateStoreIT.java | 5 +- 7 files changed, 101 insertions(+), 12 deletions(-) create mode 100644 java/core/src/test/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3UploaderTest.java diff --git a/java/core/src/main/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3Uploader.java b/java/core/src/main/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3Uploader.java index 61ca3260e4..5b82753a28 100644 --- a/java/core/src/main/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3Uploader.java +++ b/java/core/src/main/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3Uploader.java @@ -20,6 +20,7 @@ import sleeper.core.properties.instance.InstanceProperties; +import java.util.UUID; import java.util.function.Supplier; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET; @@ -29,15 +30,22 @@ */ public class StateStoreCommitRequestInS3Uploader { public static final Logger LOGGER = LoggerFactory.getLogger(StateStoreCommitRequestInS3Uploader.class); + public static final int MAX_JSON_LENGTH = 262144; private final InstanceProperties instanceProperties; private final Client client; private final Supplier filenameSupplier; + private final int maxJsonLength; private final StateStoreCommitRequestInS3SerDe serDe = new StateStoreCommitRequestInS3SerDe(); - public StateStoreCommitRequestInS3Uploader(InstanceProperties instanceProperties, Client client, Supplier filenameSupplier) { + public StateStoreCommitRequestInS3Uploader(InstanceProperties instanceProperties, Client client) { + this(instanceProperties, client, MAX_JSON_LENGTH, () -> UUID.randomUUID().toString()); + } + + public StateStoreCommitRequestInS3Uploader(InstanceProperties instanceProperties, Client client, int maxJsonLength, Supplier filenameSupplier) { this.instanceProperties = instanceProperties; this.client = client; + this.maxJsonLength = maxJsonLength; this.filenameSupplier = filenameSupplier; } @@ -51,7 +59,7 @@ public StateStoreCommitRequestInS3Uploader(InstanceProperties instanceProperties */ public String uploadAndWrapIfTooBig(String tableId, String commitRequestJson) { // Store in S3 if the request will not fit in an SQS message - if (commitRequestJson.length() > 262144) { + if (commitRequestJson.length() > maxJsonLength) { String s3Key = StateStoreCommitRequestInS3.createFileS3Key(tableId, filenameSupplier.get()); client.putObject(instanceProperties.get(DATA_BUCKET), s3Key, commitRequestJson); LOGGER.info("Request was too big for an SQS message. Will submit a reference to file in data bucket: {}", s3Key); diff --git a/java/core/src/test/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3UploaderTest.java b/java/core/src/test/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3UploaderTest.java new file mode 100644 index 0000000000..b50242a666 --- /dev/null +++ b/java/core/src/test/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3UploaderTest.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.core.statestore.commit; + +import org.junit.jupiter.api.Test; + +public class StateStoreCommitRequestInS3UploaderTest { + + @Test + void shouldUploadFileWhenTooBig() { + + } + +} diff --git a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollector.java b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollector.java index 0f800eeae6..8bce108b81 100644 --- a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollector.java +++ b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollector.java @@ -30,6 +30,7 @@ import sleeper.core.statestore.StateStoreProvider; import sleeper.core.statestore.commit.GarbageCollectionCommitRequest; import sleeper.core.statestore.commit.GarbageCollectionCommitRequestSerDe; +import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader; import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; import sleeper.garbagecollector.FailedGarbageCollectionException.TableFailures; @@ -193,11 +194,11 @@ public interface SendAsyncCommit { void sendCommit(GarbageCollectionCommitRequest commitRequest); } - public static SendAsyncCommit sendAsyncCommit(InstanceProperties instanceProperties, AmazonSQS sqs) { + public static SendAsyncCommit sendAsyncCommit(InstanceProperties instanceProperties, AmazonSQS sqs, StateStoreCommitRequestInS3Uploader s3Uploader) { GarbageCollectionCommitRequestSerDe serDe = new GarbageCollectionCommitRequestSerDe(); return request -> sqs.sendMessage(new SendMessageRequest() .withQueueUrl(instanceProperties.get(STATESTORE_COMMITTER_QUEUE_URL)) - .withMessageBody(serDe.toJson(request)) + .withMessageBody(s3Uploader.uploadAndWrapIfTooBig(request.getTableId(), serDe.toJson(request))) .withMessageGroupId(request.getTableId()) .withMessageDeduplicationId(UUID.randomUUID().toString())); } diff --git a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorLambda.java b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorLambda.java index 67cf9133d2..2958bfb65a 100644 --- a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorLambda.java +++ b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorLambda.java @@ -38,6 +38,7 @@ import sleeper.core.properties.table.TableProperties; import sleeper.core.properties.table.TablePropertiesProvider; import sleeper.core.statestore.StateStoreProvider; +import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader; import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; import sleeper.garbagecollector.FailedGarbageCollectionException.TableFailures; @@ -84,7 +85,7 @@ public GarbageCollectorLambda() { StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoDBClient, conf); garbageCollector = new GarbageCollector(deleteFileAndSketches(conf), instanceProperties, stateStoreProvider, - sendAsyncCommit(instanceProperties, sqsClient)); + sendAsyncCommit(instanceProperties, sqsClient, new StateStoreCommitRequestInS3Uploader(instanceProperties, s3Client::putObject))); } @Override diff --git a/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorS3IT.java b/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorS3IT.java index 865fb18ef8..57a21db064 100644 --- a/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorS3IT.java +++ b/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorS3IT.java @@ -46,6 +46,9 @@ import sleeper.core.statestore.StateStore; import sleeper.core.statestore.commit.GarbageCollectionCommitRequest; import sleeper.core.statestore.commit.GarbageCollectionCommitRequestSerDe; +import sleeper.core.statestore.commit.StateStoreCommitRequestInS3; +import sleeper.core.statestore.commit.StateStoreCommitRequestInS3SerDe; +import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader; import sleeper.core.statestore.testutils.FixedStateStoreProvider; import sleeper.parquet.utils.HadoopConfigurationLocalStackUtils; @@ -168,6 +171,40 @@ void shouldSendAsyncCommit() throws Exception { .containsExactly(new GarbageCollectionCommitRequest(tableProperties.get(TABLE_ID), List.of(oldFile.getFilename()))); } + @Test + void shouldUploadCommitToS3IfTooBig() throws Exception { + // Given + TableProperties tableProperties = createTableWithGCDelay(instanceProperties, 10); + tableProperties.set(GARBAGE_COLLECTOR_ASYNC_COMMIT, "true"); + Instant currentTime = Instant.parse("2023-06-28T13:46:00Z"); + Instant oldEnoughTime = currentTime.minus(Duration.ofMinutes(11)); + StateStore stateStore = setupStateStoreAndFixTime(oldEnoughTime); + // Perform a compaction on an existing file to create a readyForGC file + s3Client.putObject(testBucket, "old-file.parquet", "abc"); + s3Client.putObject(testBucket, "new-file.parquet", "def"); + FileReference oldFile = factory.rootFile("s3a://" + testBucket + "/old-file.parquet", 100L); + FileReference newFile = factory.rootFile("s3a://" + testBucket + "/new-file.parquet", 100L); + stateStore.addFile(oldFile); + stateStore.assignJobIds(List.of( + assignJobOnPartitionToFiles("test-job", "root", List.of(oldFile.getFilename())))); + stateStore.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences( + "test-job", "root", List.of(oldFile.getFilename()), newFile))); + + // When + createGarbageCollectorWithMaxCommitLength(1, instanceProperties, tableProperties, stateStore) + .runAtTime(currentTime, List.of(tableProperties)); + + // Then + assertThat(s3Client.doesObjectExist(testBucket, "old-file.parquet")).isFalse(); + assertThat(s3Client.doesObjectExist(testBucket, "new-file.parquet")).isTrue(); + assertThat(stateStore.getAllFilesWithMaxUnreferenced(10)) + .isEqualTo(activeAndReadyForGCFilesReport(oldEnoughTime, List.of(newFile), List.of(oldFile.getFilename()))); + assertThat(receiveS3CommitRequests()) + .map(request -> s3Client.getObjectAsString(testBucket, request.getKeyInS3())) + .map(new GarbageCollectionCommitRequestSerDe()::fromJson) + .containsExactly(new GarbageCollectionCommitRequest(tableProperties.get(TABLE_ID), List.of(oldFile.getFilename()))); + } + private InstanceProperties createInstanceProperties() { InstanceProperties instanceProperties = createTestInstanceProperties(); instanceProperties.set(STATESTORE_COMMITTER_QUEUE_URL, createFifoQueueGetUrl()); @@ -183,8 +220,16 @@ private TableProperties createTableWithGCDelay(InstanceProperties instanceProper } private GarbageCollector createGarbageCollector(InstanceProperties instanceProperties, TableProperties tableProperties, StateStore stateStore) { + return createGarbageCollectorWithMaxCommitLength( + StateStoreCommitRequestInS3Uploader.MAX_JSON_LENGTH, + instanceProperties, tableProperties, stateStore); + } + + private GarbageCollector createGarbageCollectorWithMaxCommitLength(int maxLength, InstanceProperties instanceProperties, TableProperties tableProperties, StateStore stateStore) { return new GarbageCollector(deleteFileAndSketches(configuration), instanceProperties, - new FixedStateStoreProvider(tableProperties, stateStore), sendAsyncCommit(instanceProperties, sqsClient)); + new FixedStateStoreProvider(tableProperties, stateStore), + sendAsyncCommit(instanceProperties, sqsClient, + new StateStoreCommitRequestInS3Uploader(instanceProperties, s3Client::putObject, maxLength, () -> UUID.randomUUID().toString()))); } private static Schema getSchema() { @@ -206,6 +251,12 @@ private List receiveGarbageCollectionCommitReque .collect(Collectors.toList()); } + private List receiveS3CommitRequests() { + return receiveCommitMessages().stream() + .map(message -> new StateStoreCommitRequestInS3SerDe().fromJson(message.getBody())) + .collect(Collectors.toList()); + } + private List receiveCommitMessages() { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest() .withQueueUrl(instanceProperties.get(STATESTORE_COMMITTER_QUEUE_URL)) diff --git a/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java b/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java index 6aecdd68bf..000c9e1473 100644 --- a/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java +++ b/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.UUID; import java.util.function.Consumer; -import java.util.function.Supplier; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL; @@ -65,15 +64,14 @@ static AddFilesToStateStore synchronous( static AddFilesToStateStore bySqs( AmazonSQS sqsClient, AmazonS3 s3Client, InstanceProperties instanceProperties, Consumer requestConfig) { - return bySqs(sqsClient, s3Client, instanceProperties, () -> UUID.randomUUID().toString(), requestConfig); + return bySqs(sqsClient, instanceProperties, new StateStoreCommitRequestInS3Uploader(instanceProperties, s3Client::putObject), requestConfig); } static AddFilesToStateStore bySqs( - AmazonSQS sqsClient, AmazonS3 s3Client, InstanceProperties instanceProperties, - Supplier s3FilenameSupplier, + AmazonSQS sqsClient, InstanceProperties instanceProperties, + StateStoreCommitRequestInS3Uploader s3Uploader, Consumer requestConfig) { IngestAddFilesCommitRequestSerDe serDe = new IngestAddFilesCommitRequestSerDe(); - StateStoreCommitRequestInS3Uploader s3Uploader = new StateStoreCommitRequestInS3Uploader(instanceProperties, s3Client::putObject, s3FilenameSupplier); return references -> { IngestAddFilesCommitRequest.Builder requestBuilder = IngestAddFilesCommitRequest.builder() .fileReferences(references); diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStoreIT.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStoreIT.java index 7988cc0a4c..9c2b56ec68 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStoreIT.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStoreIT.java @@ -40,6 +40,7 @@ import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.commit.StateStoreCommitRequestInS3; import sleeper.core.statestore.commit.StateStoreCommitRequestInS3SerDe; +import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader; import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequest; import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequestSerDe; @@ -130,7 +131,9 @@ private AddFilesToStateStore bySqs() { } private AddFilesToStateStore bySqs(Supplier filenameSupplier) { - return AddFilesToStateStore.bySqs(sqs, s3, instanceProperties, filenameSupplier, + return AddFilesToStateStore.bySqs(sqs, instanceProperties, + new StateStoreCommitRequestInS3Uploader(instanceProperties, s3::putObject, + StateStoreCommitRequestInS3Uploader.MAX_JSON_LENGTH, filenameSupplier), request -> request.tableId(tableProperties.get(TABLE_ID))); } From d83cd4462836f8597bac3478dc2dd3117e2fdfe1 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:34:29 +0000 Subject: [PATCH 11/20] Add StateStoreCommitRequestInS3UploaderTest --- ...ateStoreCommitRequestInS3UploaderTest.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/java/core/src/test/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3UploaderTest.java b/java/core/src/test/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3UploaderTest.java index b50242a666..8a78876b87 100644 --- a/java/core/src/test/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3UploaderTest.java +++ b/java/core/src/test/java/sleeper/core/statestore/commit/StateStoreCommitRequestInS3UploaderTest.java @@ -17,11 +17,60 @@ import org.junit.jupiter.api.Test; +import sleeper.core.properties.instance.InstanceProperties; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET; + public class StateStoreCommitRequestInS3UploaderTest { + private final InstanceProperties instanceProperties = new InstanceProperties(); + private final Map objectsByBucketAndKey = new HashMap<>(); + @Test void shouldUploadFileWhenTooBig() { + // Given + String commitJson = "{\"type\":\"test\",\"tableId\":\"test-table\"}"; + instanceProperties.set(DATA_BUCKET, "test-bucket"); + StateStoreCommitRequestInS3Uploader uploader = uploaderWithMaxLengthAndFilenames(10, "commit"); + + // When + String resultJson = uploader.uploadAndWrapIfTooBig("test-table", commitJson); + + // Then + assertThat(new StateStoreCommitRequestInS3SerDe().fromJson(resultJson)) + .isEqualTo(new StateStoreCommitRequestInS3("test-table/statestore/commitrequests/commit.json")); + assertThat(objectsByBucketAndKey) + .isEqualTo(Map.of("test-bucket/test-table/statestore/commitrequests/commit.json", commitJson)); + } + + @Test + void shouldNotUploadFileWhenMeetsMax() { + // Given + String commitJson = "{\"type\":\"test\",\"tableId\":\"test-table\"}"; + instanceProperties.set(DATA_BUCKET, "test-bucket"); + StateStoreCommitRequestInS3Uploader uploader = uploaderWithMaxLengthAndFilenames(commitJson.length(), "commit"); + + // When + String resultJson = uploader.uploadAndWrapIfTooBig("test-table", commitJson); + + // Then + assertThat(resultJson).isEqualTo(commitJson); + assertThat(objectsByBucketAndKey).isEmpty(); + } + + private StateStoreCommitRequestInS3Uploader uploaderWithMaxLengthAndFilenames(int maxLength, String... filenames) { + return new StateStoreCommitRequestInS3Uploader(instanceProperties, client(), maxLength, List.of(filenames).iterator()::next); + } + private StateStoreCommitRequestInS3Uploader.Client client() { + return (bucket, key, content) -> { + objectsByBucketAndKey.put(bucket + "/" + key, content); + }; } } From 48c1d5f048f0bbb6024039c48adaf4b433552241 Mon Sep 17 00:00:00 2001 From: rtjd6554 <174791724+rtjd6554@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:34:36 +0000 Subject: [PATCH 12/20] Regex update --- code-style/dependency-check-suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code-style/dependency-check-suppressions.xml b/code-style/dependency-check-suppressions.xml index 2ac611607b..bd8c27ee67 100644 --- a/code-style/dependency-check-suppressions.xml +++ b/code-style/dependency-check-suppressions.xml @@ -288,7 +288,7 @@ We're not using the HttpURI class directly, and the CVE states that Jetty is not vulnerable unless you use that directly. ]]> - ^pkg:maven/org\.eclipse\.jetty\*$ + ^pkg:maven/org\.eclipse\.jetty\@.*$ CVE-2024-6763 From b3511c2f4767cf8f1eaaaa7853abb8892843a73d Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:40:56 +0000 Subject: [PATCH 13/20] Move reading VPC check property out of VpcStack --- .../cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java | 12 +++++++++++- .../src/main/java/sleeper/cdk/stack/VpcStack.java | 10 ---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java b/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java index b8e3d4fdcd..21bf876032 100644 --- a/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java +++ b/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java @@ -15,6 +15,8 @@ */ package sleeper.cdk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awscdk.App; import software.amazon.awscdk.AppProps; import software.amazon.awscdk.Environment; @@ -80,11 +82,14 @@ import static sleeper.core.properties.instance.CommonProperty.ID; import static sleeper.core.properties.instance.CommonProperty.OPTIONAL_STACKS; import static sleeper.core.properties.instance.CommonProperty.REGION; +import static sleeper.core.properties.instance.CommonProperty.VPC_ENDPOINT_CHECK; /** * Deploys an instance of Sleeper, including any configured optional stacks. */ public class SleeperCdkApp extends Stack { + public static final Logger LOGGER = LoggerFactory.getLogger(SleeperCdkApp.class); + private final InstanceProperties instanceProperties; private final BuiltJars jars; private final App app; @@ -121,7 +126,12 @@ public void create() { LoggingStack loggingStack = new LoggingStack(this, "Logging", instanceProperties); // Stack for Checking VPC configuration - new VpcStack(this, "Vpc", instanceProperties, jars, loggingStack); + if (instanceProperties.getBoolean(VPC_ENDPOINT_CHECK)) { + new VpcStack(this, "Vpc", instanceProperties, jars, loggingStack); + } else { + LOGGER.warn("Skipping VPC check as requested by the user. Be aware that VPCs that don't have an S3 endpoint can result " + + "in very significant NAT charges."); + } // Topic stack TopicStack topicStack = new TopicStack(this, "Topic", instanceProperties); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java index 40cfbe5950..95a0155311 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java @@ -16,8 +16,6 @@ package sleeper.cdk.stack; import com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awscdk.CustomResource; import software.amazon.awscdk.CustomResourceProps; import software.amazon.awscdk.NestedStack; @@ -41,21 +39,13 @@ import java.util.Map; import static sleeper.core.properties.instance.CommonProperty.REGION; -import static sleeper.core.properties.instance.CommonProperty.VPC_ENDPOINT_CHECK; import static sleeper.core.properties.instance.CommonProperty.VPC_ID; public class VpcStack extends NestedStack { - private static final Logger LOGGER = LoggerFactory.getLogger(VpcStack.class); public VpcStack(Construct scope, String id, InstanceProperties instanceProperties, BuiltJars jars, LoggingStack logging) { super(scope, id); - if (!instanceProperties.getBoolean(VPC_ENDPOINT_CHECK)) { - LOGGER.warn("Skipping VPC check as requested by the user. Be aware that VPCs that don't have an S3 endpoint can result " - + "in very significant NAT charges."); - return; - } - // Jars bucket IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", jars.bucketName()); LambdaCode lambdaCode = jars.lambdaCode(jarsBucket); From 21b7f56e78190db7b945490fd37e1da4fc079167 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:48:27 +0000 Subject: [PATCH 14/20] Raise polling timeout waiting for state store committer logs --- .../dsl/statestore/SystemTestStateStore.java | 5 ++++- .../SystemTestStateStoreFakeCommits.java | 14 ++++++++++---- .../SystemTestStateStoreFakeCommitsTest.java | 7 +++---- .../systemtest/suite/StateStoreCommitterST.java | 4 +--- .../suite/StateStoreCommitterThroughputST.java | 15 +++++++-------- 5 files changed, 25 insertions(+), 20 deletions(-) diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStore.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStore.java index 41cbe49e85..958e11ac32 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStore.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStore.java @@ -27,6 +27,7 @@ import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import sleeper.systemtest.dsl.snapshot.SnapshotsDriver; import sleeper.systemtest.dsl.snapshot.WaitForSnapshot; +import sleeper.systemtest.dsl.util.PollWithRetriesDriver; import java.time.Instant; import java.util.Map; @@ -43,6 +44,7 @@ public class SystemTestStateStore { private final StateStoreCommitterDriver driver; private final StateStoreCommitterLogsDriver logsDriver; private final SnapshotsDriver snapshotsDriver; + private final PollWithRetriesDriver pollDriver; public SystemTestStateStore(SystemTestContext context) { this.context = context; @@ -50,10 +52,11 @@ public SystemTestStateStore(SystemTestContext context) { driver = adminDrivers.stateStoreCommitter(context); logsDriver = adminDrivers.stateStoreCommitterLogs(context); snapshotsDriver = adminDrivers.snapshots(); + pollDriver = adminDrivers.pollWithRetries(); } public SystemTestStateStoreFakeCommits fakeCommits() { - return new SystemTestStateStoreFakeCommits(context, driver, logsDriver); + return new SystemTestStateStoreFakeCommits(context, driver, logsDriver, pollDriver); } public double commitsPerSecondForTable() { diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStoreFakeCommits.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStoreFakeCommits.java index dbf9c5bf95..a7ecb753d2 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStoreFakeCommits.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStoreFakeCommits.java @@ -18,10 +18,11 @@ import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; -import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SystemTestContext; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; +import sleeper.systemtest.dsl.util.PollWithRetriesDriver; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; @@ -34,6 +35,7 @@ public class SystemTestStateStoreFakeCommits { private final SystemTestInstanceContext instance; private final StateStoreCommitterDriver driver; + private final PollWithRetriesDriver pollDriver; private final WaitForStateStoreCommitLogs waiter; private final Map waitForNumCommitsByTableId = new ConcurrentHashMap<>(); private final Instant getRunsAfterTime; @@ -41,8 +43,10 @@ public class SystemTestStateStoreFakeCommits { public SystemTestStateStoreFakeCommits( SystemTestContext context, StateStoreCommitterDriver driver, - StateStoreCommitterLogsDriver logsDriver) { + StateStoreCommitterLogsDriver logsDriver, + PollWithRetriesDriver pollDriver) { this.driver = driver; + this.pollDriver = pollDriver; instance = context.instance(); waiter = new WaitForStateStoreCommitLogs(logsDriver); getRunsAfterTime = context.reporting().getRecordingStartTime(); @@ -77,8 +81,10 @@ public SystemTestStateStoreFakeCommits send(StateStoreCommitMessage.Commit commi return this; } - public SystemTestStateStoreFakeCommits waitForCommitLogs(PollWithRetries poll) throws InterruptedException { - waiter.waitForCommitLogs(poll, waitForNumCommitsByTableId, getRunsAfterTime); + public SystemTestStateStoreFakeCommits waitForCommitLogs() throws InterruptedException { + waiter.waitForCommitLogs( + pollDriver.pollWithIntervalAndTimeout(Duration.ofSeconds(20), Duration.ofMinutes(5)), + waitForNumCommitsByTableId, getRunsAfterTime); return this; } diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/statestore/SystemTestStateStoreFakeCommitsTest.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/statestore/SystemTestStateStoreFakeCommitsTest.java index 408dd80e3b..3b0c7795c9 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/statestore/SystemTestStateStoreFakeCommitsTest.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/statestore/SystemTestStateStoreFakeCommitsTest.java @@ -18,7 +18,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.testutil.InMemoryDslTest; import sleeper.systemtest.dsl.testutil.InMemorySystemTestDrivers; @@ -47,7 +46,7 @@ void shouldSendOneFileCommit(SleeperSystemTest sleeper) throws Exception { // When sleeper.stateStore().fakeCommits() .send(StateStoreCommitMessage.addPartitionFile("root", "file.parquet", 100)) - .waitForCommitLogs(PollWithRetries.noRetries()); + .waitForCommitLogs(); // Then assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())) @@ -63,7 +62,7 @@ void shouldSendManyFileCommits(SleeperSystemTest sleeper) throws Exception { sleeper.stateStore().fakeCommits() .sendBatched(LongStream.rangeClosed(1, 1000) .mapToObj(i -> StateStoreCommitMessage.addPartitionFile("root", "file-" + i + ".parquet", i))) - .waitForCommitLogs(PollWithRetries.noRetries()); + .waitForCommitLogs(); // Then assertThat(sleeper.tableFiles().references()).hasSize(1000); @@ -78,7 +77,7 @@ void shouldWaitForCommitWhenCommitWasMadeButNoRunStartOrFinishLogsWereMade(Sleep committer.addFakeCommits(sleeper, 1); // When / Then - assertThatCode(() -> commitsDsl.waitForCommitLogs(PollWithRetries.noRetries())) + assertThatCode(() -> commitsDsl.waitForCommitLogs()) .doesNotThrowAnyException(); } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterST.java index 229d7c31c2..40a43e7482 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterST.java @@ -21,12 +21,10 @@ import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.statestore.FileReferenceFactory; -import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.statestore.StateStoreCommitMessage; import sleeper.systemtest.suite.testutil.SystemTest; -import java.time.Duration; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -55,7 +53,7 @@ void shouldAddManyFiles(SleeperSystemTest sleeper) throws Exception { .sendBatched(IntStream.rangeClosed(1, 1000) .mapToObj(i -> fileFactory.rootFile("file-" + i + ".parquet", i)) .map(StateStoreCommitMessage::addFile)) - .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); + .waitForCommitLogs(); // Then assertThat(sleeper.tableFiles().recordsByFilename()).isEqualTo( diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterThroughputST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterThroughputST.java index f27cbac1fe..19143febeb 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterThroughputST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterThroughputST.java @@ -21,7 +21,6 @@ import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.statestore.FileReferenceFactory; -import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.statestore.StateStoreCommitMessage; import sleeper.systemtest.suite.testutil.Slow; @@ -65,7 +64,7 @@ void shouldMeetExpectedThroughputWhenCommittingFilesWithNoJobOnOneTable(SleeperS .sendBatched(IntStream.rangeClosed(1, 1000) .mapToObj(i -> fileFactory.rootFile(filename(i), i)) .map(StateStoreCommitMessage::addFile)) - .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); + .waitForCommitLogs(); // Then assertThat(sleeper.tableFiles().references()).hasSize(1000); @@ -86,7 +85,7 @@ void shouldMeetExpectedThroughputWhenCommittingFilesWithIngestJobOnOneTable(Slee .sendBatched(IntStream.rangeClosed(1, 1000) .mapToObj(i -> fileFactory.rootFile(filename(i), i)) .map(StateStoreCommitMessage::addFileWithJob)) - .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); + .waitForCommitLogs(); // Then assertThat(sleeper.tableFiles().references()).hasSize(1000); @@ -108,7 +107,7 @@ void shouldMeetExpectedThroughputWhenCommittingFilesWithNoJobOnMultipleTables(Sl .sendBatchedForEachTable(IntStream.rangeClosed(1, 1000) .mapToObj(i -> fileFactory.rootFile(filename(i), i)) .map(StateStoreCommitMessage::addFile)) - .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); + .waitForCommitLogs(); // Then assertThat(sleeper.tableFiles().referencesByTable()) @@ -138,7 +137,7 @@ void shouldMeetExpectedThroughputWhenCommittingCompactionJobIdAssignment(Sleeper sleeper.stateStore().fakeCommits() .sendBatched(IntStream.rangeClosed(1, 1000) .mapToObj(i -> factory -> factory.assignJobOnPartitionToFiles(jobId(i), "root", List.of(filename(i))))) - .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); + .waitForCommitLogs(); // Then assertThat(printFiles(partitions, sleeper.tableFiles().all())) @@ -175,7 +174,7 @@ void shouldMeetExpectedThroughputWhenCommittingCompaction(SleeperSystemTest slee .mapToObj(i -> factory -> factory.commitCompactionForPartitionOnTaskInRun( jobId(i), "root", List.of(filename(i), filename(i + 1000)), "test-task", jobRunId(i), summary(startTime(i), Duration.ofMinutes(1), i * 2, i * 2)))) - .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); + .waitForCommitLogs(); // Then assertThat(printFiles(partitions, sleeper.tableFiles().all())) @@ -218,7 +217,7 @@ void shouldMeetExpectedThroughputWhenCommittingDeletedFiles(SleeperSystemTest sl sleeper.stateStore().fakeCommits() .sendBatched(IntStream.rangeClosed(1, 1000) .mapToObj(i -> factory -> factory.filesDeleted(List.of(filename(i), filename(i + 1000))))) - .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); + .waitForCommitLogs(); // Then assertThat(printFiles(partitions, sleeper.tableFiles().all())) @@ -252,7 +251,7 @@ void shouldMeetExpectedThroughputWhenPerformingManyOperationsOnMultipleTables(Sl jobId(i), "root", List.of(filename(i), filename(i + 1000)), "test-task", jobRunId(i), summary(startTime(i), Duration.ofMinutes(1), i * 2, i * 2)), factory -> factory.filesDeleted(List.of(filename(i), filename(i + 1000)))))) - .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); + .waitForCommitLogs(); // Then assertThat(sleeper.tableFiles().referencesByTable()) From a620cf1525a9d45b1d4c15c60f723d827ad11b51 Mon Sep 17 00:00:00 2001 From: rtjd6554 <174791724+rtjd6554@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:55:56 +0000 Subject: [PATCH 15/20] Regex improvement --- code-style/dependency-check-suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code-style/dependency-check-suppressions.xml b/code-style/dependency-check-suppressions.xml index bd8c27ee67..182a99ea84 100644 --- a/code-style/dependency-check-suppressions.xml +++ b/code-style/dependency-check-suppressions.xml @@ -288,7 +288,7 @@ We're not using the HttpURI class directly, and the CVE states that Jetty is not vulnerable unless you use that directly. ]]> - ^pkg:maven/org\.eclipse\.jetty\@.*$ + ^pkg:maven/org\.eclipse\.jetty/jetty-@.*$ CVE-2024-6763 From c2ed3b6ed53e2d148fd87c4ca4439b1ef1d713b5 Mon Sep 17 00:00:00 2001 From: rtjd6554 <174791724+rtjd6554@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:00:10 +0000 Subject: [PATCH 16/20] Simplfy regex --- code-style/dependency-check-suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code-style/dependency-check-suppressions.xml b/code-style/dependency-check-suppressions.xml index 182a99ea84..0b1d3ce96b 100644 --- a/code-style/dependency-check-suppressions.xml +++ b/code-style/dependency-check-suppressions.xml @@ -288,7 +288,7 @@ We're not using the HttpURI class directly, and the CVE states that Jetty is not vulnerable unless you use that directly. ]]> - ^pkg:maven/org\.eclipse\.jetty/jetty-@.*$ + ^pkg:maven/org\.eclipse\.jetty/.*$ CVE-2024-6763 From 1626fa98b83a0180a3697da96f41d49b41005091 Mon Sep 17 00:00:00 2001 From: rtjd6554 <174791724+rtjd6554@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:11:58 +0000 Subject: [PATCH 17/20] Additional folder conditions --- code-style/dependency-check-suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code-style/dependency-check-suppressions.xml b/code-style/dependency-check-suppressions.xml index 0b1d3ce96b..7d44844f41 100644 --- a/code-style/dependency-check-suppressions.xml +++ b/code-style/dependency-check-suppressions.xml @@ -288,7 +288,7 @@ We're not using the HttpURI class directly, and the CVE states that Jetty is not vulnerable unless you use that directly. ]]> - ^pkg:maven/org\.eclipse\.jetty/.*$ + ^pkg:maven/org\.eclipse\.jetty(/.*$|\.http2/http2.*$|\.websocket/websocket.*$) CVE-2024-6763 From b839ca11b29e4917d720fa7483b5515ddc3a044f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:23:56 +0000 Subject: [PATCH 18/20] Bump com.nimbusds:nimbus-jose-jwt from 9.45 to 9.46 in /java Bumps [com.nimbusds:nimbus-jose-jwt](https://bitbucket.org/connect2id/nimbus-jose-jwt) from 9.45 to 9.46. - [Changelog](https://bitbucket.org/connect2id/nimbus-jose-jwt/src/master/CHANGELOG.txt) - [Commits](https://bitbucket.org/connect2id/nimbus-jose-jwt/branches/compare/9.46..9.45) --- updated-dependencies: - dependency-name: com.nimbusds:nimbus-jose-jwt dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index 124d1a97a5..aa588af921 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -97,7 +97,7 @@ 4.28.3 - 9.45 + 9.46 1.12.0 From ebedb396fc6bd4f92fd1fe08ad0418be8e3a70e3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:23:57 +0000 Subject: [PATCH 19/20] Bump aws-java-sdk-v2.version from 2.29.8 to 2.29.9 in /java Bumps `aws-java-sdk-v2.version` from 2.29.8 to 2.29.9. Updates `software.amazon.awssdk:s3` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:s3-transfer-manager` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:dynamodb` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:sqs` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:sts` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:ecr` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:cloudwatch` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:cloudwatchevents` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:cloudwatchlogs` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:cloudformation` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:lambda` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:ec2` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:ecs` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:emr` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:emrserverless` from 2.29.8 to 2.29.9 Updates `software.amazon.awssdk:apache-client` from 2.29.8 to 2.29.9 --- updated-dependencies: - dependency-name: software.amazon.awssdk:s3 dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:s3-transfer-manager dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:dynamodb dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:sqs dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:sts dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:ecr dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:cloudwatch dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:cloudwatchevents dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:cloudwatchlogs dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:cloudformation dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:lambda dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:ec2 dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:ecs dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:emr dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:emrserverless dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:apache-client dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index 124d1a97a5..96f9588413 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -124,7 +124,7 @@ 1.5.12 1.12.498 - 2.29.8 + 2.29.9 0.33.0 3.14.0 1.2.3 From 1cf433f2899c01ae7af2b5b2e30a3d6de00f1532 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 03:21:00 +0000 Subject: [PATCH 20/20] Bump aws-java-sdk-v2.version from 2.29.9 to 2.29.10 in /java Bumps `aws-java-sdk-v2.version` from 2.29.9 to 2.29.10. Updates `software.amazon.awssdk:s3` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:s3-transfer-manager` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:dynamodb` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:sqs` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:sts` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:ecr` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:cloudwatch` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:cloudwatchevents` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:cloudwatchlogs` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:cloudformation` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:lambda` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:ec2` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:ecs` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:emr` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:emrserverless` from 2.29.9 to 2.29.10 Updates `software.amazon.awssdk:apache-client` from 2.29.9 to 2.29.10 --- updated-dependencies: - dependency-name: software.amazon.awssdk:s3 dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:s3-transfer-manager dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:dynamodb dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:sqs dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:sts dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:ecr dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:cloudwatch dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:cloudwatchevents dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:cloudwatchlogs dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:cloudformation dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:lambda dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:ec2 dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:ecs dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:emr dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:emrserverless dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: software.amazon.awssdk:apache-client dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index 36129992ee..abc4139614 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -124,7 +124,7 @@ 1.5.12 1.12.498 - 2.29.9 + 2.29.10 0.33.0 3.14.0 1.2.3