diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 82dbb5fd85..c642746947 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -10,6 +10,7 @@ "vscjava.vscode-java-pack", "shengchen.vscode-checkstyle", "eamodio.gitlens", + "1yib.rust-bundle", "mhutchie.git-graph" ] } diff --git a/.github/config/chunks.yaml b/.github/config/chunks.yaml index 4778fd4201..02b4198523 100644 --- a/.github/config/chunks.yaml +++ b/.github/config/chunks.yaml @@ -22,7 +22,7 @@ chunks: compaction: name: Compaction workflow: chunk-compaction.yaml - modules: [ compaction/compaction-job-execution, compaction/compaction-task-creation, compaction/compaction-job-creation, compaction/compaction-job-creation-lambda, compaction/compaction-status-store, compaction/compaction-core, splitter/splitter-core, splitter/splitter-lambda ] + modules: [ compaction/compaction-job-execution, compaction/compaction-task-creation, compaction/compaction-job-creation, compaction/compaction-job-creation-lambda, compaction/compaction-status-store, compaction/compaction-core, splitter/splitter-core, splitter/splitter-lambda, compaction/compaction-rust ] data: name: Data workflow: chunk-data.yaml @@ -47,3 +47,5 @@ chunks: name: Trino workflow: chunk-trino.yaml modules: [ trino ] + + diff --git a/.github/workflows/chunk-cdk.yaml b/.github/workflows/chunk-cdk.yaml index 07ffa5111d..8faee98e66 100644 --- a/.github/workflows/chunk-cdk.yaml +++ b/.github/workflows/chunk-cdk.yaml @@ -26,6 +26,7 @@ on: - 'java/bulk-import/bulk-import-common/**' - 'java/splitter/splitter-core/**' - 'java/compaction/compaction-job-execution/**' + - 'java/compaction/compaction-rust/**' - 'java/ingest/ingest-batcher-core/**' - 'java/query/query-runner/**' - 'java/garbage-collector/**' diff --git a/.github/workflows/chunk-compaction.yaml b/.github/workflows/chunk-compaction.yaml index 4ebb2b081b..73e2d85458 100644 --- a/.github/workflows/chunk-compaction.yaml +++ b/.github/workflows/chunk-compaction.yaml @@ -12,6 +12,7 @@ on: - 'java/splitter/pom.xml' - 'java/compaction/compaction-job-execution/**' - 'java/compaction/compaction-task-creation/**' + - 'java/compaction/compaction-rust/**' - 'java/compaction/compaction-job-creation/**' - 'java/compaction/compaction-job-creation-lambda/**' - 'java/compaction/compaction-status-store/**' diff --git a/.github/workflows/chunk.yaml b/.github/workflows/chunk.yaml index 85ad74b807..c83b3929d5 100644 --- a/.github/workflows/chunk.yaml +++ b/.github/workflows/chunk.yaml @@ -4,6 +4,10 @@ on: chunkId: required: true type: string + skipRust: + default: true + required: false + type: boolean jobs: build: @@ -15,6 +19,11 @@ jobs: with: java-version: '17' distribution: 'corretto' + - uses: dtolnay/rust-toolchain@1.79.0 + if: ${{ ! inputs.skipRust }} + - name: Install cargo cross + run: cargo install cross + if: ${{ ! inputs.skipRust }} - name: Cache dependencies uses: actions/cache@v4 with: @@ -34,11 +43,11 @@ jobs: -Dexec.args="${{ inputs.chunkId }} github_actions_outputs ${{ github.workspace }}/.github/config/chunks.yaml" \ >> $GITHUB_OUTPUT - name: Compile - run: mvn --batch-mode clean install -am -pl ${{ steps.config.outputs.moduleList }} -Pquick,skipShade -Dmaven.repo.local=${{ runner.temp }}/.m2/repository + run: mvn --batch-mode clean install -am -pl ${{ steps.config.outputs.moduleList }} -Pquick,skipShade -DskipRust=${{ inputs.skipRust }} -Dmaven.repo.local=${{ runner.temp }}/.m2/repository working-directory: ./java - name: Test id: test - run: mvn --batch-mode --fail-at-end verify -pl ${{ steps.config.outputs.moduleList }} -Dmaven.repo.local=${{ runner.temp }}/.m2/repository -e + run: mvn --batch-mode --fail-at-end verify -pl ${{ steps.config.outputs.moduleList }} -DskipRust=${{ inputs.skipRust }} -Dmaven.repo.local=${{ runner.temp }}/.m2/repository -e working-directory: ./java - name: Generate site id: site diff --git a/.github/workflows/dependency-check.yaml b/.github/workflows/dependency-check.yaml index 946159ad36..e6c912ffaa 100644 --- a/.github/workflows/dependency-check.yaml +++ b/.github/workflows/dependency-check.yaml @@ -35,7 +35,7 @@ jobs: run: mvn --batch-mode dependency-check:update-only -Dmaven.repo.local=${{ runner.temp }}/.m2/repository working-directory: ./java - name: Build with Maven - run: mvn --batch-mode verify dependency-check:aggregate -Pquick -Dmaven.repo.local=${{ runner.temp }}/.m2/repository + run: mvn --batch-mode verify dependency-check:aggregate -Pquick -DskipRust -Dmaven.repo.local=${{ runner.temp }}/.m2/repository working-directory: ./java - name: Cache Maven dependencies & CVEs database uses: actions/cache/save@v3 diff --git a/.github/workflows/docker-cli-image.yaml b/.github/workflows/docker-cli-image.yaml index 508132878c..f873c53344 100644 --- a/.github/workflows/docker-cli-image.yaml +++ b/.github/workflows/docker-cli-image.yaml @@ -41,7 +41,12 @@ jobs: steps: - name: Delete huge unnecessary tools folder - run: rm -rf /opt/hostedtoolcache + run: | + rm -rf /opt/hostedtoolcache + rm -rf /usr/share/dotnet + rm -rf /opt/ghc + rm -rf "/usr/local/share/boost" + rm -rf "$AGENT_TOOLSDIRECTORY" - uses: actions/checkout@v3 - uses: actions/setup-java@v3 with: diff --git a/.github/workflows/docker-cli.yaml b/.github/workflows/docker-cli.yaml index e62c7e3c93..05af8bb333 100644 --- a/.github/workflows/docker-cli.yaml +++ b/.github/workflows/docker-cli.yaml @@ -79,7 +79,7 @@ jobs: needs: setup uses: ./.github/workflows/docker-cli-image.yaml with: - mavenCmd: ./scripts/cli/environment/buildMaven.sh package -Pquick --batch-mode -Dmaven.repo.local=../.m2/repository + mavenCmd: ./scripts/cli/environment/buildMaven.sh package -Pquick -DskipRust --batch-mode -Dmaven.repo.local=../.m2/repository pushImages: ${{ inputs.pushImages }} context: ./scripts/cli/environment pushTag: ${{ needs.setup.outputs.envTag }} diff --git a/.github/workflows/maven-full.yaml b/.github/workflows/maven-full.yaml index b2df24988e..574e6225ba 100644 --- a/.github/workflows/maven-full.yaml +++ b/.github/workflows/maven-full.yaml @@ -29,7 +29,7 @@ jobs: run: mvn de.qaware.maven:go-offline-maven-plugin:resolve-dependencies -Dmaven.repo.local=${{ runner.temp }}/.m2/repository working-directory: ./java - name: Build with Maven - run: mvn --batch-mode verify -Pquick -T 1C -Dmaven.repo.local=${{ runner.temp }}/.m2/repository + run: mvn --batch-mode verify -Pquick -T 1C -DskipRust -Dmaven.repo.local=${{ runner.temp }}/.m2/repository working-directory: ./java - name: Validate properties templates are up to date working-directory: ./java diff --git a/.gitignore b/.gitignore index 7ef4e430e3..d64768bb80 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ __pycache__ python/build/ python/src/sleeper.egg-info/ python/env/ + diff --git a/NOTICES b/NOTICES index 6216fea938..38344e85d6 100644 --- a/NOTICES +++ b/NOTICES @@ -283,6 +283,107 @@ s3fs: - The 3-Clause BSD License +Sleeper contains Rust code. This has the following dependencies. + +Rust Object Store (https://github.com/apache/arrow-rs/tree/master/object_store) + +- Apache License, Version 2.0 + +color-eyre (https://github.com/yaahc/color-eyre) + +- Apache License, Version 2.0 + +Tokio (https://github.com/tokio-rs/tokio) + +- MIT License + +owo-colors (https://github.com/jam1garner/owo-colors) + +- MIT License + +thiserror (https://github.com/dtolnay/thiserror) + +- Apache License, Version 2.0 + +log (https://github.com/rust-lang/log) + +- Apache License, Version 2.0 + +env_logger (https://github.com/rust-cli/env_logger/) + +- Apache License, Version 2.0 + +human-panic (https://github.com/rust-cli/human-panic) + +- Apache License, Version 2.0 + +clap (https://github.com/clap-rs/clap) + +- Apache License, Version 2.0 + +libc (https://github.com/rust-lang/libc) + +- Apache License, Version 2.0 + +arrow (https://github.com/apache/arrow-rs) + +- Apache License, Version 2.0 + +futures (https://github.com/rust-lang/futures-rs) + +- MIT License + +itertools (https://github.com/rust-itertools/itertools) + +- Apache License, Version 2.0 + +object_store (https://github.com/apache/arrow-rs/tree/master/object_store) + +- Apache License, Version 2.0 + +aws-config (https://github.com/awslabs/smithy-rs) + +- Apache License, Version 2.0 + +aws-credentials (https://github.com/awslabs/smithy-rs) + +- Apache License, Version 2.0 + +aws-types (https://github.com/awslabs/smithy-rs) + +- Apache License, Version 2.0 + +url (https://github.com/servo/rust-url) + +- Apache License, Version 2.0 + +bytes (https://github.com/tokio-rs/bytes) + +- MIT License + +tokio-test (https://github.com/tokio-rs/tokio) + +- MIT License + +chrono (https://github.com/chronotope/chrono) + +- Apache License, Version 2.0 + +num-format (https://github.com/bcmyers/num-format) + +- Apache License, Version 2.0 + +cxx (https://github.com/dtolnay/cxx) + +- Apache 2 License + +datasketches-cpp (https://github.com/apache/datasketches-cpp) + +- Apache 2 License + +git2 (https://github.com/rust-lang/git2-rs) + +- Apache 2 License The build pipeline uses the following GitHub Actions from the marketplace. diff --git a/docs/11-dev-guide.md b/docs/11-dev-guide.md index 1522a24fe2..18c9e2c887 100644 --- a/docs/11-dev-guide.md +++ b/docs/11-dev-guide.md @@ -85,6 +85,8 @@ You will need the following software: * [Java 11/17](https://openjdk.java.net/install/) * [Maven](https://maven.apache.org/): Tested with v3.8.6 * [NodeJS / NPM](https://github.com/nvm-sh/nvm#installing-and-updating): Tested with NodeJS v16.16.0 and npm v8.11.0 +* [Rust](https://rustup.rs/): Tested with Rust v1.77 +* [Cross-rs](https://github.com/cross-rs/cross) ## Building @@ -122,6 +124,15 @@ mvn clean install -Pquick Removing the '-Pquick' option will cause the unit and integration tests to run. +### Disabling Rust component + +You can disable the building of the Rust modules with: + +```bash +cd java +mvn clean install -Pquick -DskipRust=true +``` + ## Using the codebase The codebase is structured around the components explained in the [design document](12-design.md). The elements of the @@ -141,19 +152,19 @@ For VS Code there's [a separate setup guide](/.vscode/README.md). For IntelliJ, these settings are available to import: -- Code style scheme at [code-style/intellij-style.xml](/code-style/intellij-style.xml) -- Inspection profile at [code-style/intellij-inspection-profile.xml](/code-style/intellij-inspection-profile.xml) -- Copyright profile for license header +* Code style scheme at [code-style/intellij-style.xml](/code-style/intellij-style.xml) +* Inspection profile at [code-style/intellij-inspection-profile.xml](/code-style/intellij-inspection-profile.xml) +* Copyright profile for license header at [code-style/intellij-copyright-profile.xml](/code-style/intellij-copyright-profile.xml) -- Checkstyle plugin settings in [code-style/checkstyle-idea](/code-style/checkstyle-idea) +* Checkstyle plugin settings in [code-style/checkstyle-idea](/code-style/checkstyle-idea) For Eclipse, these settings are available to import: -- Code style at [code-style/eclipse-style.xml](/code-style/eclipse-style.xml) -- Import order at [code-style/eclipse-import-order.importorder](/code-style/eclipse-import-order.importorder) -- License header at [code-style/licenseHeader.txt](/code-style/licenseHeader.txt) -- Code templates at [code-style/eclipse-codetemplates.xml](/code-style/eclipse-codetemplates.xml) -- Editor templates at [code-style/eclipse-templates.xml](/code-style/eclipse-templates.xml) +* Code style at [code-style/eclipse-style.xml](/code-style/eclipse-style.xml) +* Import order at [code-style/eclipse-import-order.importorder](/code-style/eclipse-import-order.importorder) +* License header at [code-style/licenseHeader.txt](/code-style/licenseHeader.txt) +* Code templates at [code-style/eclipse-codetemplates.xml](/code-style/eclipse-codetemplates.xml) +* Editor templates at [code-style/eclipse-templates.xml](/code-style/eclipse-templates.xml) ### Linting @@ -173,7 +184,7 @@ We try to ensure that all classes have Javadoc. Most methods should also have Ja getters and setters can be skipped unless there's something important to know. See Oracle's standards for Javadoc: -https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html + Note that the first sentence in a Javadoc comment will be used as a summary fragment in generated documentation. This should not contain any links or formatting, to read normally as an item in a list. @@ -244,8 +255,8 @@ When deploying multiple instances (or running multiple system tests), many log g it difficult to find the logs you need to view. This script will delete any log groups that meet all of the following criteria: -- Its name does not contain the name of any deployed CloudFormation stack -- Either it's empty, or it has no retention period and is older than 30 days +* Its name does not contain the name of any deployed CloudFormation stack +* Either it's empty, or it has no retention period and is older than 30 days This can be used to limit the number of log groups in your AWS account, particularly if all your log groups are deployed by the CDK or CloudFormation, with the stack name in the log group name. diff --git a/example/full/instance.properties b/example/full/instance.properties index 234bdc2fa2..5b1d09e29c 100644 --- a/example/full/instance.properties +++ b/example/full/instance.properties @@ -940,6 +940,10 @@ sleeper.default.table.compaction.strategy.sizeratio.ratio=3 # concurrently per partition. It can be overridden on a per-table basis. sleeper.default.table.compaction.strategy.sizeratio.max.concurrent.jobs.per.partition=100000 +# Select what compaction method to use on a table. Current options are JAVA and RUST. Rust compaction +# support is experimental. +sleeper.default.table.compaction.method=JAVA + ## The following properties relate to queries. diff --git a/example/full/table.properties b/example/full/table.properties index 1f868a5e17..c7efaef5d8 100644 --- a/example/full/table.properties +++ b/example/full/table.properties @@ -120,6 +120,10 @@ sleeper.table.compaction.strategy.sizeratio.ratio=3 # concurrently per partition. sleeper.table.compaction.strategy.sizeratio.max.concurrent.jobs.per.partition=2147483647 +# Select what compaction method to use on a table. Current options are JAVA and RUST. Rust compaction +# support is experimental. +sleeper.table.compaction.method=JAVA + ## The following table properties relate to storing and retrieving metadata for tables. diff --git a/java/build/src/main/java/sleeper/build/github/actions/WorkflowTriggerPathsDiff.java b/java/build/src/main/java/sleeper/build/github/actions/WorkflowTriggerPathsDiff.java index f0a106c01b..bc982f0316 100644 --- a/java/build/src/main/java/sleeper/build/github/actions/WorkflowTriggerPathsDiff.java +++ b/java/build/src/main/java/sleeper/build/github/actions/WorkflowTriggerPathsDiff.java @@ -112,7 +112,7 @@ public int hashCode() { @Override public String toString() { - return "OnPushPathsDiff{" + + return "OnPullRequestPathsDiff{" + "expected=" + expected + ", actual=" + actual + ", missingEntries=" + missingEntries + diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionJobStatusStore.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionJobStatusStore.java index 1dd9ffc671..4d95a28764 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionJobStatusStore.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionJobStatusStore.java @@ -28,7 +28,6 @@ import java.util.stream.Stream; public interface CompactionJobStatusStore { - CompactionJobStatusStore NONE = new CompactionJobStatusStore() { }; diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionRunner.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionRunner.java new file mode 100644 index 0000000000..ce43365a5f --- /dev/null +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionRunner.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.job; + +import sleeper.core.record.process.RecordsProcessed; + +@FunctionalInterface +public interface CompactionRunner extends CompactionRunnerDetails { + RecordsProcessed compact(CompactionJob job) throws Exception; +} diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionRunnerDetails.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionRunnerDetails.java new file mode 100644 index 0000000000..d856933fd4 --- /dev/null +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/job/CompactionRunnerDetails.java @@ -0,0 +1,46 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.job; + +public interface CompactionRunnerDetails { + /** + * Some compaction implementations may use hardware acceleration such as GPUs. + * + * @return true iff this compaction implementation uses any sort of hardware acceleration + */ + default boolean isHardwareAccelerated() { + return false; + } + + /** + * What language is this implemented in? If multiple languages are used, the primary + * one used for performing the compaction computation should be returned. + * + * @return the principal implementation language for this compactor + */ + default String implementationLanguage() { + return "Java"; + } + + /** + * States whether this compactor compact Sleeper tables that have iterators attached to them. + * + * @return true if iterators can be processed by this compactor + */ + default boolean supportsIterators() { + return false; + } +} diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/task/CompactionAlgorithmSelector.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/task/CompactionAlgorithmSelector.java new file mode 100644 index 0000000000..24adf6e352 --- /dev/null +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/task/CompactionAlgorithmSelector.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.task; + +import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionRunner; + +/** + * Interface for classes that implement logic for choosing which compaction method should be chosen. + */ +@FunctionalInterface +public interface CompactionAlgorithmSelector { + /** + * Picks a CompactionRunner implementation that is capable + * of running a compaction on the given job. + * + * @param job compaction job + * @return a compactor object + */ + CompactionRunner chooseCompactor(CompactionJob job); +} diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/task/CompactionTask.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/task/CompactionTask.java index 5af32d3144..149554ab61 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/task/CompactionTask.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/task/CompactionTask.java @@ -21,6 +21,7 @@ import sleeper.compaction.job.CompactionJob; import sleeper.compaction.job.CompactionJobStatusStore; +import sleeper.compaction.job.CompactionRunner; import sleeper.compaction.job.commit.CompactionJobCommitterOrSendToLambda; import sleeper.configuration.properties.PropertiesReloader; import sleeper.configuration.properties.instance.InstanceProperties; @@ -55,7 +56,7 @@ public class CompactionTask { private final PropertiesReloader propertiesReloader; private final Consumer sleepForTime; private final MessageReceiver messageReceiver; - private final CompactionRunner compactor; + private final CompactionAlgorithmSelector selector; private final CompactionJobStatusStore jobStatusStore; private final CompactionTaskStatusStore taskStatusStore; private final CompactionJobCommitterOrSendToLambda jobCommitter; @@ -65,26 +66,26 @@ public class CompactionTask { private final WaitForFileAssignment waitForFiles; public CompactionTask(InstanceProperties instanceProperties, PropertiesReloader propertiesReloader, - MessageReceiver messageReceiver, WaitForFileAssignment waitForFiles, CompactionRunner compactor, + MessageReceiver messageReceiver, WaitForFileAssignment waitForFiles, CompactionJobCommitterOrSendToLambda jobCommitter, CompactionJobStatusStore jobStore, - CompactionTaskStatusStore taskStore, String taskId) { - this(instanceProperties, propertiesReloader, messageReceiver, waitForFiles, compactor, jobCommitter, - jobStore, taskStore, taskId, () -> UUID.randomUUID().toString(), Instant::now, threadSleep()); + CompactionTaskStatusStore taskStore, CompactionAlgorithmSelector selector, String taskId) { + this(instanceProperties, propertiesReloader, messageReceiver, waitForFiles, jobCommitter, + jobStore, taskStore, selector, taskId, () -> UUID.randomUUID().toString(), Instant::now, threadSleep()); } public CompactionTask( InstanceProperties instanceProperties, PropertiesReloader propertiesReloader, MessageReceiver messageReceiver, WaitForFileAssignment waitForFiles, - CompactionRunner compactor, CompactionJobCommitterOrSendToLambda jobCommitter, - CompactionJobStatusStore jobStore, CompactionTaskStatusStore taskStore, + CompactionJobCommitterOrSendToLambda jobCommitter, + CompactionJobStatusStore jobStore, CompactionTaskStatusStore taskStore, CompactionAlgorithmSelector selector, String taskId, Supplier jobRunIdSupplier, Supplier timeSupplier, Consumer sleepForTime) { this.instanceProperties = instanceProperties; this.propertiesReloader = propertiesReloader; this.timeSupplier = timeSupplier; this.sleepForTime = sleepForTime; this.messageReceiver = messageReceiver; - this.compactor = compactor; + this.selector = selector; this.jobStatusStore = jobStore; this.taskStatusStore = taskStore; this.taskId = taskId; @@ -163,6 +164,7 @@ private RecordsProcessedSummary compact(CompactionJob job, String jobRunId, Inst LOGGER.info("Compaction job {}: compaction called at {}", job.getId(), jobStartTime); jobStatusStore.jobStarted(compactionJobStarted(job, jobStartTime).taskId(taskId).jobRunId(jobRunId).build()); propertiesReloader.reloadIfNeeded(); + CompactionRunner compactor = this.selector.chooseCompactor(job); RecordsProcessed recordsProcessed = compactor.compact(job); Instant jobFinishTime = timeSupplier.get(); RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, jobStartTime, jobFinishTime); @@ -185,11 +187,6 @@ public interface MessageReceiver { Optional receiveMessage() throws IOException; } - @FunctionalInterface - public interface CompactionRunner { - RecordsProcessed compact(CompactionJob job) throws Exception; - } - @FunctionalInterface public interface WaitForFileAssignment { void wait(CompactionJob job) throws InterruptedException; diff --git a/java/compaction/compaction-core/src/test/java/sleeper/compaction/task/CompactionTaskTestBase.java b/java/compaction/compaction-core/src/test/java/sleeper/compaction/task/CompactionTaskTestBase.java index e9e7f7bb43..48a575b7d1 100644 --- a/java/compaction/compaction-core/src/test/java/sleeper/compaction/task/CompactionTaskTestBase.java +++ b/java/compaction/compaction-core/src/test/java/sleeper/compaction/task/CompactionTaskTestBase.java @@ -18,9 +18,9 @@ import org.junit.jupiter.api.BeforeEach; import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionRunner; import sleeper.compaction.job.commit.CompactionJobCommitRequest; import sleeper.compaction.job.commit.CompactionJobCommitterOrSendToLambda; -import sleeper.compaction.task.CompactionTask.CompactionRunner; import sleeper.compaction.task.CompactionTask.MessageHandle; import sleeper.compaction.task.CompactionTask.MessageReceiver; import sleeper.compaction.task.CompactionTask.WaitForFileAssignment; @@ -136,9 +136,10 @@ private void runTask( CompactionJobCommitterOrSendToLambda committer = new CompactionJobCommitterOrSendToLambda( new FixedTablePropertiesProvider(tables), stateStoreByTableId::get, jobStore, commitRequestsOnQueue::add, timeSupplier); + CompactionAlgorithmSelector selector = job -> compactor; new CompactionTask(instanceProperties, PropertiesReloader.neverReload(), messageReceiver, fileAssignmentCheck, - compactor, committer, jobStore, taskStore, taskId, jobRunIdSupplier, timeSupplier, sleeps::add) + committer, jobStore, taskStore, selector, taskId, jobRunIdSupplier, timeSupplier, sleeps::add) .run(); } diff --git a/java/compaction/compaction-job-execution/docker/Dockerfile b/java/compaction/compaction-job-execution/docker/Dockerfile index 34a40e8443..8de5aeb3f2 100644 --- a/java/compaction/compaction-job-execution/docker/Dockerfile +++ b/java/compaction/compaction-job-execution/docker/Dockerfile @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -FROM amazoncorretto:11 +FROM amazonlinux:2023.5.20240701.0 +RUN yum install -y java-11-amazon-corretto-devel COPY compaction-job-execution.jar /compaction-job-execution.jar COPY run.sh /run.sh diff --git a/java/compaction/compaction-job-execution/pom.xml b/java/compaction/compaction-job-execution/pom.xml index 6dcde7ed27..238b6b2c9d 100644 --- a/java/compaction/compaction-job-execution/pom.xml +++ b/java/compaction/compaction-job-execution/pom.xml @@ -14,8 +14,9 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - + compaction sleeper @@ -51,6 +52,11 @@ compaction-status-store ${project.parent.version} + + sleeper + compaction-rust + ${project.parent.version} + sleeper common-job @@ -180,4 +186,4 @@ - + \ No newline at end of file diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionMethod.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionMethod.java new file mode 100644 index 0000000000..d03600e412 --- /dev/null +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionMethod.java @@ -0,0 +1,32 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.job.execution; + +/** + * Different compaction methods for Sleeper which support different capabilities and must be + * selected based on need. + */ +public enum CompactionMethod { + /** Pure Java compaction implementation. */ + JAVA, + /** + * Rust compaction method. This uses a native library written in Rust to perform a + * compaction. + */ + RUST; + + public static final CompactionMethod DEFAULT = CompactionMethod.JAVA; +} diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java new file mode 100644 index 0000000000..f40813c010 --- /dev/null +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java @@ -0,0 +1,87 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.job.execution; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionRunner; +import sleeper.compaction.task.CompactionAlgorithmSelector; +import sleeper.configuration.jars.ObjectFactory; +import sleeper.configuration.properties.table.TableProperties; +import sleeper.configuration.properties.table.TablePropertiesProvider; +import sleeper.configuration.properties.table.TableProperty; +import sleeper.statestore.StateStoreProvider; + +import java.util.Locale; + +/** + * Determines which compaction algorithm should be run based on the table and instance configuration properties and + * other environmental information. + */ +public class DefaultSelector implements CompactionAlgorithmSelector { + private final TablePropertiesProvider tablePropertiesProvider; + private final ObjectFactory objectFactory; + private final StateStoreProvider stateStoreProvider; + private final Configuration configuration; + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSelector.class); + + public DefaultSelector( + TablePropertiesProvider tablePropertiesProvider, + StateStoreProvider stateStoreProvider, ObjectFactory objectFactory, Configuration configuration) { + this.tablePropertiesProvider = tablePropertiesProvider; + this.objectFactory = objectFactory; + this.stateStoreProvider = stateStoreProvider; + this.configuration = configuration; + } + + @Override + public CompactionRunner chooseCompactor(CompactionJob job) { + TableProperties tableProperties = tablePropertiesProvider + .getById(job.getTableId()); + String method = tableProperties.get(TableProperty.COMPACTION_METHOD).toUpperCase(Locale.UK); + + // Convert to enum value and default to Java + CompactionMethod desired; + try { + desired = CompactionMethod.valueOf(method); + } catch (IllegalArgumentException e) { + desired = CompactionMethod.DEFAULT; + } + + CompactionRunner defaultRunner = new StandardCompactor(tablePropertiesProvider, stateStoreProvider, objectFactory, configuration); + CompactionRunner runner = defaultRunner; + switch (desired) { + case RUST: + runner = new RustCompaction(tablePropertiesProvider, stateStoreProvider); + break; + default: + break; + } + + // Is an iterator specifed? If so can we support this? + if (job.getIteratorClassName() != null && !runner.supportsIterators()) { + LOGGER.debug("Table has an iterator set, which compactor {} doesn't support, falling back to default", runner.getClass().getSimpleName()); + runner = defaultRunner; + } + + LOGGER.info("Selecting {} compactor (language {}) for job ID {} table ID {}", runner.getClass().getSimpleName(), runner.implementationLanguage(), job.getId(), job.getTableId()); + return runner; + } +} diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java index 3a489baad2..6df1e115dd 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java @@ -57,7 +57,7 @@ import static sleeper.configuration.utils.AwsV1ClientHelper.buildAwsV1Client; /** - * Runs a compaction task in ECS. Delegates the running of compaction jobs to {@link CompactSortedFiles}, + * Runs a compaction task in ECS. Delegates the running of compaction jobs to {@link DefaultSelector}, * and the processing of SQS messages to {@link SqsCompactionQueueHandler}. */ public class ECSCompactionTaskRunner { @@ -97,14 +97,17 @@ public static void main(String[] args) throws IOException, ObjectFactoryExceptio String taskId = UUID.randomUUID().toString(); ObjectFactory objectFactory = new ObjectFactory(instanceProperties, s3Client, "/tmp"); + + DefaultSelector compactionSelector = new DefaultSelector(tablePropertiesProvider, stateStoreProvider, objectFactory, + HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); + WaitForFileAssignment waitForFiles = new StateStoreWaitForFiles(stateStoreProvider.byTableId(tablePropertiesProvider)); - CompactSortedFiles compactSortedFiles = new CompactSortedFiles(instanceProperties, - tablePropertiesProvider, stateStoreProvider, objectFactory); + CompactionJobCommitterOrSendToLambda committerOrLambda = committerOrSendToLambda( tablePropertiesProvider, stateStoreProvider, jobStatusStore, instanceProperties, sqsClient); CompactionTask task = new CompactionTask(instanceProperties, propertiesReloader, new SqsCompactionQueueHandler(sqsClient, instanceProperties), - waitForFiles, compactSortedFiles, committerOrLambda, jobStatusStore, taskStatusStore, taskId); + waitForFiles, committerOrLambda, jobStatusStore, taskStatusStore, compactionSelector, taskId); task.run(); } finally { sqsClient.shutdown(); diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/StandardCompactor.java similarity index 89% rename from java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java rename to java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/StandardCompactor.java index 6f4ff80e18..0b309cc2a8 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/StandardCompactor.java @@ -25,10 +25,9 @@ import org.slf4j.LoggerFactory; import sleeper.compaction.job.CompactionJob; -import sleeper.compaction.task.CompactionTask; +import sleeper.compaction.job.CompactionRunner; import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.jars.ObjectFactoryException; -import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.properties.table.TableProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; import sleeper.core.iterator.CloseableIterator; @@ -41,11 +40,9 @@ import sleeper.core.schema.Schema; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; -import sleeper.core.util.ExponentialBackoffWithJitter.WaitRange; import sleeper.io.parquet.record.ParquetReaderIterator; import sleeper.io.parquet.record.ParquetRecordReader; import sleeper.io.parquet.record.ParquetRecordWriterFactory; -import sleeper.io.parquet.utils.HadoopConfigurationProvider; import sleeper.io.parquet.utils.RangeQueryUtils; import sleeper.sketches.Sketches; import sleeper.sketches.s3.SketchesSerDeToS3; @@ -62,24 +59,15 @@ /** * Executes a compaction job. Compacts N input files into a single output file. */ -public class CompactSortedFiles implements CompactionTask.CompactionRunner { - private static final Logger LOGGER = LoggerFactory.getLogger(CompactSortedFiles.class); - public static final int JOB_ASSIGNMENT_WAIT_ATTEMPTS = 10; - public static final WaitRange JOB_ASSIGNMENT_WAIT_RANGE = WaitRange.firstAndMaxWaitCeilingSecs(2, 60); - +public class StandardCompactor implements CompactionRunner { private final TablePropertiesProvider tablePropertiesProvider; private final ObjectFactory objectFactory; private final StateStoreProvider stateStoreProvider; private final Configuration configuration; - public CompactSortedFiles( - InstanceProperties instanceProperties, TablePropertiesProvider tablePropertiesProvider, - StateStoreProvider stateStoreProvider, ObjectFactory objectFactory) { - this(tablePropertiesProvider, stateStoreProvider, objectFactory, - HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); - } + private static final Logger LOGGER = LoggerFactory.getLogger(StandardCompactor.class); - public CompactSortedFiles( + public StandardCompactor( TablePropertiesProvider tablePropertiesProvider, StateStoreProvider stateStoreProvider, ObjectFactory objectFactory, Configuration configuration) { this.tablePropertiesProvider = tablePropertiesProvider; @@ -106,7 +94,8 @@ public RecordsProcessed compact(CompactionJob compactionJob) throws IOException, LOGGER.debug("Creating writer for file {}", compactionJob.getOutputFile()); Path outputPath = new Path(compactionJob.getOutputFile()); // Setting file writer mode to OVERWRITE so if the same job runs again after failing to - // update the state store, it will overwrite the existing output file written by the previous run + // update the state store, it will overwrite the existing output file written + // by the previous run ParquetWriter writer = ParquetRecordWriterFactory.createParquetRecordWriter( outputPath, tableProperties, configuration, ParquetFileWriter.Mode.OVERWRITE); @@ -148,6 +137,7 @@ public RecordsProcessed compact(CompactionJob compactionJob) throws IOException, private List> createInputIterators(CompactionJob compactionJob, Partition partition, Schema schema) throws IOException { List> inputIterators = new ArrayList<>(); + FilterCompat.Filter partitionFilter = FilterCompat.get(RangeQueryUtils.getFilterPredicate(partition)); for (String file : compactionJob.getInputFiles()) { ParquetReader reader = new ParquetRecordReader.Builder(new Path(file), schema) @@ -176,6 +166,7 @@ public static CloseableIterator getMergingIterator( } catch (ObjectFactoryException e) { throw new IteratorCreationException("ObjectFactoryException creating iterator of class " + compactionJob.getIteratorClassName(), e); } + LOGGER.debug("Created iterator of class {}", compactionJob.getIteratorClassName()); iterator.init(compactionJob.getIteratorConfig(), schema); LOGGER.debug("Initialised iterator with config {}", compactionJob.getIteratorConfig()); @@ -183,4 +174,19 @@ public static CloseableIterator getMergingIterator( } return mergingIterator; } + + @Override + public boolean supportsIterators() { + return true; + } + + @Override + public String implementationLanguage() { + return "Java"; + } + + @Override + public boolean isHardwareAccelerated() { + return false; + } } diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java index 331ea493c2..fb5a2063e4 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.Test; import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionRunner; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; @@ -25,6 +26,7 @@ import sleeper.core.schema.Schema; import sleeper.core.schema.type.LongType; import sleeper.core.statestore.FileReference; +import sleeper.io.parquet.utils.HadoopConfigurationProvider; import java.util.List; @@ -52,8 +54,10 @@ void shouldMergeFilesCorrectlyWhenSomeAreEmpty() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); - RecordsProcessed summary = compactSortedFiles.compact(compactionJob); + DefaultSelector selector = createCompactionSelector(schema, + HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); + CompactionRunner runner = selector.chooseCompactor(compactionJob); + RecordsProcessed summary = runner.compact(compactionJob); // Then // - Read output file and check that it contains the right results @@ -76,8 +80,10 @@ void shouldMergeFilesCorrectlyWhenAllAreEmpty() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); - RecordsProcessed summary = compactSortedFiles.compact(compactionJob); + DefaultSelector selector = createCompactionSelector(schema, + HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); + CompactionRunner runner = selector.chooseCompactor(compactionJob); + RecordsProcessed summary = runner.compact(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java index 563bba9688..aa570ae145 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionRunner; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; import sleeper.core.partition.PartitionsBuilder; @@ -30,6 +31,7 @@ import sleeper.core.schema.type.LongType; import sleeper.core.schema.type.StringType; import sleeper.core.statestore.FileReference; +import sleeper.io.parquet.utils.HadoopConfigurationProvider; import java.util.List; @@ -55,8 +57,10 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithLongKey() throws Exception assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); - RecordsProcessed summary = compactSortedFiles.compact(compactionJob); + DefaultSelector selector = createCompactionSelector(schema, + HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); + CompactionRunner runner = selector.chooseCompactor(compactionJob); + RecordsProcessed summary = runner.compact(compactionJob); // Then // - Read output file and check that it contains the right results @@ -102,8 +106,10 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithStringKey() throws Exceptio assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); - RecordsProcessed summary = compactSortedFiles.compact(compactionJob); + DefaultSelector selector = createCompactionSelector(schema, + HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); + CompactionRunner runner = selector.chooseCompactor(compactionJob); + RecordsProcessed summary = runner.compact(compactionJob); // Then // - Read output file and check that it contains the right results @@ -156,8 +162,10 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithByteArrayKey() throws Excep assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); - RecordsProcessed summary = compactSortedFiles.compact(compactionJob); + DefaultSelector selector = createCompactionSelector(schema, + HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); + CompactionRunner runner = selector.chooseCompactor(compactionJob); + RecordsProcessed summary = runner.compact(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java index 99d1592749..f26fd0a18a 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.Test; import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionRunner; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestUtils; @@ -27,6 +28,7 @@ import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; +import sleeper.io.parquet.utils.HadoopConfigurationProvider; import java.util.List; @@ -64,8 +66,10 @@ void shouldApplyIteratorDuringCompaction() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); - RecordsProcessed summary = compactSortedFiles.compact(compactionJob); + DefaultSelector selector = createCompactionSelector(schema, + HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); + CompactionRunner runner = selector.chooseCompactor(compactionJob); + RecordsProcessed summary = runner.compact(compactionJob); // Then // - Read output files and check that they contain the right results assertThat(summary.getRecordsRead()).isEqualTo(200L); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java index 52f512b0c1..bbda0a784b 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionRunner; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; import sleeper.compaction.status.store.job.DynamoDBCompactionJobStatusStoreCreator; @@ -113,10 +114,9 @@ private StateStore createStateStore(Schema schema) { .getStateStore(tableProperties); } - private CompactSortedFiles createCompactSortedFiles(Schema schema, StateStore stateStore) throws Exception { + private DefaultSelector createCompactSortedFiles(Schema schema, StateStore stateStore) throws Exception { tableProperties.setSchema(schema); - return new CompactSortedFiles( - new FixedTablePropertiesProvider(tableProperties), + return new DefaultSelector(new FixedTablePropertiesProvider(tableProperties), new FixedStateStoreProvider(tableProperties, stateStore), ObjectFactory.noUserJars(), configuration); @@ -140,8 +140,9 @@ public void shouldUpdateStateStoreAfterRunningCompactionJob() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, stateStore); - RecordsProcessed summary = compactSortedFiles.compact(compactionJob); + DefaultSelector selector = createCompactSortedFiles(schema, stateStore); + CompactionRunner runner = selector.chooseCompactor(compactionJob); + RecordsProcessed summary = runner.compact(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java index 35a38f5e27..31f682dee5 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java @@ -452,16 +452,15 @@ private CompactionTask createTask(String taskId, StateStoreProvider stateStorePr private CompactionTask createTask( String taskId, StateStoreProvider stateStoreProvider, Supplier jobRunIdSupplier, Supplier timeSupplier) { - CompactSortedFiles compactSortedFiles = new CompactSortedFiles(instanceProperties, - tablePropertiesProvider, stateStoreProvider, - ObjectFactory.noUserJars()); + DefaultSelector selector = new DefaultSelector(tablePropertiesProvider, stateStoreProvider, + ObjectFactory.noUserJars(), configuration); CompactionJobCommitterOrSendToLambda committer = ECSCompactionTaskRunner.committerOrSendToLambda( tablePropertiesProvider, stateStoreProvider, jobStatusStore, instanceProperties, sqs); CompactionTask task = new CompactionTask(instanceProperties, PropertiesReloader.neverReload(), new SqsCompactionQueueHandler(sqs, instanceProperties), waitWithRetries(1, stateStoreProvider, tablePropertiesProvider), - compactSortedFiles, committer, jobStatusStore, taskStatusStore, taskId, + committer, jobStatusStore, taskStatusStore, selector, taskId, jobRunIdSupplier, timeSupplier, duration -> { }); return task; diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java index 46bc1a18f1..88b5d69c6a 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java @@ -15,11 +15,12 @@ */ package sleeper.compaction.job.execution.testutils; +import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; import sleeper.compaction.job.CompactionJobFactory; -import sleeper.compaction.job.execution.CompactSortedFiles; +import sleeper.compaction.job.execution.DefaultSelector; import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.properties.table.FixedTablePropertiesProvider; @@ -66,12 +67,12 @@ protected CompactionJobFactory compactionFactory() { return new CompactionJobFactory(instanceProperties, tableProperties); } - protected CompactSortedFiles createCompactSortedFiles(Schema schema) throws Exception { + protected DefaultSelector createCompactionSelector(Schema schema, Configuration configuration) throws Exception { tableProperties.setSchema(schema); - return new CompactSortedFiles(instanceProperties, - new FixedTablePropertiesProvider(tableProperties), + return new DefaultSelector(new FixedTablePropertiesProvider(tableProperties), new FixedStateStoreProvider(tableProperties, stateStore), - ObjectFactory.noUserJars()); + ObjectFactory.noUserJars(), + configuration); } protected FileReference ingestRecordsGetFile(List records) throws Exception { diff --git a/java/compaction/compaction-rust/pom.xml b/java/compaction/compaction-rust/pom.xml new file mode 100644 index 0000000000..2fce869d50 --- /dev/null +++ b/java/compaction/compaction-rust/pom.xml @@ -0,0 +1,206 @@ + + + + + compaction + sleeper + 0.24.0-SNAPSHOT + + 4.0.0 + compaction-rust + + + + sleeper + compaction-job-creation + ${project.parent.version} + + + sleeper + common-job + ${project.parent.version} + + + sleeper + core + ${project.parent.version} + + + + com.github.jnr + jnr-ffi + ${jnr.ffi.version} + + + + org.scijava + native-lib-loader + ${scijava.native.version} + + + + sleeper + core + ${project.parent.version} + test-jar + test + + + sleeper + configuration + ${project.parent.version} + test-jar + test + + + sleeper + dynamodb-tools + ${project.parent.version} + test-jar + test + + + sleeper + statestore + ${project.parent.version} + test-jar + test + + + org.testcontainers + localstack + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + org.mockito + mockito-core + test + + + + + + + org.apache.maven.plugins + maven-resources-plugin + ${resources.plugin.version} + + copy-resources + + + ${project.build.outputDirectory}/natives/ + + + ${maven.multiModuleProjectDirectory}/../rust/target/ + + release/*.dylib + release/*.dll + release/*.so + */release/*.dylib + */release/*.dll + */release/*.so + debug/*.dylib + debug/*.dll + debug/*.so + */debug/*.dylib + */debug/*.dll + */debug/*.so + + false + + + UTF-8 + + + + + + + + + skipRust + !true + + + rust + + + + org.codehaus.mojo + exec-maven-plugin + ${exec.plugin.version} + + + + Invoke Rust Cargo build (Linux x86_64) + generate-resources + + exec + + + cross + ${maven.multiModuleProjectDirectory}/../rust + + build + --release + --target + x86_64-unknown-linux-gnu + + + + + Invoke Rust Cargo build (Linux Aarch64) + generate-resources + + exec + + + cross + ${maven.multiModuleProjectDirectory}/../rust + + build + --release + --target + aarch64-unknown-linux-gnu + + + -Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1 + + + + + + + + + + + + \ No newline at end of file diff --git a/java/compaction/compaction-rust/src/main/java/sleeper/compaction/job/execution/RustBridge.java b/java/compaction/compaction-rust/src/main/java/sleeper/compaction/job/execution/RustBridge.java new file mode 100644 index 0000000000..ad7e52f6a6 --- /dev/null +++ b/java/compaction/compaction-rust/src/main/java/sleeper/compaction/job/execution/RustBridge.java @@ -0,0 +1,376 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.job.execution; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import jnr.ffi.LibraryLoader; +import jnr.ffi.NativeType; +import jnr.ffi.Struct; +import jnr.ffi.annotations.In; +import jnr.ffi.annotations.Out; +import org.scijava.nativelib.JniExtractor; +import org.scijava.nativelib.NativeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +public class RustBridge { + /** + * Native library extraction object. This can extract native libraries from the classpath and + * unpack them to a temporary directory. + */ + private static final JniExtractor EXTRACTOR = NativeLoader.getJniExtractor(); + + /** Paths in the JAR file where a native library may have been placed. */ + private static final String[] LIB_PATHS = {"natives/release", + "natives/x86_64-unknown-linux-gnu/release", "natives/aarch64-unknown-linux-gnu/release", + // Rust debug builds will place libraries in different locations + "natives/x86_64-unknown-linux-gnu/debug", "natives/aarch64-unknown-linux-gnu/debug"}; + + private static final Logger LOGGER = LoggerFactory.getLogger(RustBridge.class); + + private static Compaction nativeCompaction = null; + + /** + * Attempt to load the native compaction library. + * + * The native library will be extracted from the classpath and unpacked to a temporary + * directory. The library is then loaded and linked. Multiple locations are checked in the + * classpath, representing different architectures. Thus, if we attempt to load a library for + * the wrong CPU architecture, loading will fail and the next path will be tried. This way, we + * maintain a single JAR file that can work across multiple CPU architectures. + * + * @return the native compaction object + * @throws IOException if an error occurs during loading or linking the native library + */ + public static synchronized Compaction getRustCompactor() throws IOException { + try { + Compaction nativeLib; + + if (nativeCompaction == null) { + nativeLib = extractAndLink(Compaction.class, "compaction"); + nativeCompaction = nativeLib; + } else { + nativeLib = nativeCompaction; + } + + return nativeLib; + + } catch (UnsatisfiedLinkError err) { + throw (IOException) new IOException().initCause(err); + } + } + + /** + * Loads the named library after extracting it from the classpath. + * + * This function extracts the named library from a JAR on the classpath and attempts to load it + * and bind it to the given interface class. The paths in the array {@link LIB_PATHS} are tried + * in order. If a library is found at a path, this method will attempt to load it. If no library + * is found on the classpath or it can't be loaded (e.g. wrong binary format), the next path + * will be tried. + * + * The library named should be given without platform prefixes, e.g. "foo" will be expanded into + * "libfoo.so" or "foo.dll" as appropriate for this platform. + * + * @param clazz the Java interface type for the native library + * @param libName the library name to extract without platform prefixes. + * @return the absolute extracted path, or null if the library couldn't be found + * @throws IOException if an error occured during file extraction + * @throws UnsatisfiedLinkError if the library could not be found or loaded + */ + public static T extractAndLink(Class clazz, String libName) throws IOException { + // Work through each potential path to see if we can load the library + // successfully + for (String path : LIB_PATHS) { + LOGGER.debug("Attempting to load native library from JAR path {}", path); + // Attempt extraction + File extractedLib = EXTRACTOR.extractJni(path, libName); + + // If file located, attempt to load + if (extractedLib != null) { + LOGGER.debug("Extracted file is at {}", extractedLib); + try { + return LibraryLoader.create(clazz).failImmediately() + .load(extractedLib.getAbsolutePath()); + } catch (UnsatisfiedLinkError e) { + // wrong library, try the next path + LOGGER.error("Unable to load native library from " + path, e); + } + } + } + + // No matches + throw new UnsatisfiedLinkError("Couldn't locate or load " + libName); + } + + /** + * The compaction input data that will be populated from the Java side. If you updated + * this struct (field ordering, types, etc.), you MUST update the corresponding Rust definition + * in rust/compaction/src/lib.rs. + */ + @SuppressWarnings(value = {"checkstyle:membername"}) + @SuppressFBWarnings(value = {"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"}) + public static class FFICompactionParams extends Struct { + /** Array of input files to compact. */ + public final Array input_files = new Array<>(this); + /** Output file name. */ + public final Struct.UTF8StringRef output_file = new Struct.UTF8StringRef(); + /** Names of Sleeper row key columns from schema. */ + public final Array row_key_cols = new Array<>(this); + /** Types for region schema 1 = Int, 2 = Long, 3 = String, 4 = Byte array. */ + public final Array row_key_schema = new Array<>(this); + /** Names of Sleeper sort key columns from schema. */ + public final Array sort_key_cols = new Array<>(this); + /** Maximum size of output Parquet row group in rows. */ + public final Struct.size_t max_row_group_size = new Struct.size_t(); + /** Maximum size of output Parquet page size in bytes. */ + public final Struct.size_t max_page_size = new Struct.size_t(); + /** Output Parquet compression codec. */ + public final Struct.UTF8StringRef compression = new Struct.UTF8StringRef(); + /** Output Parquet writer version. Must be 1.0 or 2.0 */ + public final Struct.UTF8StringRef writer_version = new Struct.UTF8StringRef(); + /** Column min/max values truncation length in output Parquet. */ + public final Struct.size_t column_truncate_length = new Struct.size_t(); + /** Max sizeof statistics block in output Parquet. */ + public final Struct.size_t stats_truncate_length = new Struct.size_t(); + /** Should row key columns use dictionary encoding in output Parquet. */ + public final Struct.Boolean dict_enc_row_keys = new Struct.Boolean(); + /** Should sort key columns use dictionary encoding in output Parquet. */ + public final Struct.Boolean dict_enc_sort_keys = new Struct.Boolean(); + /** Should value columns use dictionary encoding in output Parquet. */ + public final Struct.Boolean dict_enc_values = new Struct.Boolean(); + /** Compaction partition region minimums. MUST BE SAME LENGTH AS row_key_cols. */ + public final Array region_mins = new Array<>(this); + /** Compaction partition region maximums. MUST BE SAME LENGTH AS row_key_cols. */ + public final Array region_maxs = new Array<>(this); + /** Compaction partition region minimums are inclusive? MUST BE SAME LENGTH AS row_key_cols. */ + public final Array region_mins_inclusive = new Array<>(this); + /** Compaction partition region maximums are inclusive? MUST BE SAME LENGTH AS row_key_cols. */ + public final Array region_maxs_inclusive = new Array<>(this); + + public FFICompactionParams(jnr.ffi.Runtime runtime) { + super(runtime); + } + + /** + * Validate state of struct. + * + * @throws IllegalStateException when a invariant fails + */ + public void validate() { + input_files.validate(); + row_key_cols.validate(); + row_key_schema.validate(); + sort_key_cols.validate(); + region_mins.validate(); + region_maxs.validate(); + region_mins_inclusive.validate(); + region_maxs_inclusive.validate(); + + // Check strings non null + Objects.requireNonNull(output_file.get(), "Output file is null"); + Objects.requireNonNull(writer_version, "Parquet writer is null"); + Objects.requireNonNull(compression, "Parquet compression codec is null"); + + // Check lengths + long rowKeys = row_key_cols.len.get(); + if (rowKeys != row_key_schema.len.get()) { + throw new IllegalStateException("row key schema array has length " + row_key_schema.len.get() + " but there are " + rowKeys + " row key columns"); + } + if (rowKeys != region_mins.len.get()) { + throw new IllegalStateException("region mins has length " + region_mins.len.get() + " but there are " + rowKeys + " row key columns"); + } + if (rowKeys != region_maxs.len.get()) { + throw new IllegalStateException("region maxs has length " + region_maxs.len.get() + " but there are " + rowKeys + " row key columns"); + } + if (rowKeys != region_mins_inclusive.len.get()) { + throw new IllegalStateException("region mins inclusives has length " + region_mins_inclusive.len.get() + " but there are " + rowKeys + " row key columns"); + } + if (rowKeys != region_maxs_inclusive.len.get()) { + throw new IllegalStateException("region maxs inclusives has length " + region_maxs_inclusive.len.get() + " but there are " + rowKeys + " row key columns"); + } + } + } + + /** + * Array class that can be inside a Struct. Creates a dynamic array that can be passed to C. + * Strong references are maintained for allocated memory so GC will dispose of memory when + * this object is collected. + * + * @param object type of array + */ + public static class Array { + // Length of array + public final Struct.size_t len; + // Pointer to base of dynamically allocated array + public final Struct.Pointer arrayBase; + // Reference to dynamically allocated array to prevent GC until Array instance is collected + public jnr.ffi.Pointer basePtr; + // Reference to dynamically allocated items to prevent GC until Array instance is collected + public jnr.ffi.Pointer[] items; + + public Array(Struct enclosing) { + this.len = enclosing.new size_t(); + this.arrayBase = enclosing.new Pointer(); + } + + /** + * Create a dynamic array of items in this array. + * + * A base pointer is allocated pointers set to other + * dynamically allocated memory containing items from array. + * + * @param arr array data + * @param nullsAllowed if null pointers are allowed in the data array + */ + public void populate(final T[] arr, boolean nullsAllowed) { + final jnr.ffi.Runtime r = len.struct().getRuntime(); + // Calculate size needed for array of pointers + int ptrSize = r.findType(NativeType.ADDRESS).size(); + // Null out zero length arrays + if (arr.length > 0) { + int size = arr.length * ptrSize; + // Allocate some memory for pointers + this.basePtr = r.getMemoryManager().allocateDirect(size); + this.arrayBase.set(basePtr); + + this.items = new jnr.ffi.Pointer[arr.length]; + + for (int i = 0; i < arr.length; i++) { + if (!nullsAllowed && arr[i] == null) { + throw new NullPointerException("Index " + i + " of array is null when nulls aren't allowed here"); + } + setValue(arr[i], i, r); + } + + // Bulk set the pointers in the base array + this.basePtr.put(0, this.items, 0, this.items.length); + } else { + this.basePtr = null; + this.items = null; + } + + // Set length of array in struct + this.len.set(arr.length); + } + + /** + * Check class invariants. + * + * @throws IllegalStateException if a violation is found + */ + public void validate() { + if (len.get() == 0) { + if (basePtr != null || items != null) { + throw new IllegalStateException("array length is 0 but pointers not null"); + } + } else { + if (len.get() != items.length) { + throw new IllegalStateException("length of " + len.get() + " doesn't match items length of " + items.length); + } + Objects.requireNonNull(this.basePtr, "base pointer is null"); + Objects.requireNonNull(this.items, "items array is null"); + if (this.arrayBase.get().address() != this.basePtr.address()) { + throw new IllegalStateException("array base pointer and stored base pointer differ!"); + } + } + } + + /** + * Sets a given value in the array to a specific value. The data + * is byte encoded. + * + * Intended for internal use only. + * + * @param item type, must be int, long String or byte[], or boolean + * @param item the item to encode + * @param idx array position to use + * @param r struct runtime + * @throws ClassCastException if item is of wrong class + * @throws IndexOutOfBoundsException if idx is invalid + */ + protected void setValue(E item, int idx, jnr.ffi.Runtime r) { + if (item == null) { + this.items[idx] = jnr.ffi.Pointer.wrap(r, 0); + } else if (item instanceof Integer) { + int e = (int) item; + this.items[idx] = r.getMemoryManager().allocateDirect(r.findType(NativeType.SINT).size()); + this.items[idx].putInt(0, e); + } else if (item instanceof Long) { + long e = (long) item; + this.items[idx] = r.getMemoryManager().allocateDirect(r.findType(NativeType.SLONGLONG).size()); + this.items[idx].putLong(0, e); + } else if (item instanceof java.lang.String) { + // Strings are encoded as 4 byte length then value + java.lang.String e = (java.lang.String) item; + byte[] utf8string = e.getBytes(StandardCharsets.UTF_8); + // Add four for length + int stringSize = utf8string.length + 4; + // Allocate memory for string and write length then the string + this.items[idx] = r.getMemoryManager().allocateDirect(stringSize); + this.items[idx].putInt(0, utf8string.length); + this.items[idx].put(4, utf8string, 0, utf8string.length); + } else if (item instanceof byte[]) { + // Byte arrays are encoded as 4 byte length then value + byte[] e = (byte[]) item; + int byteSize = e.length + 4; + this.items[idx] = r.getMemoryManager().allocateDirect(byteSize); + this.items[idx].putInt(0, e.length); + this.items[idx].put(4, e, 0, e.length); + } else if (item instanceof Boolean) { + boolean e = (boolean) item; + this.items[idx] = r.getMemoryManager().allocateDirect(1); + this.items[idx].putByte(0, e ? (byte) 1 : (byte) 0); + } else { + throw new ClassCastException("Can't cast " + item.getClass() + " to a valid Sleeper row key type"); + } + } + } + + /** + * The compaction output data that the native code will populate. + */ + @SuppressWarnings(value = {"checkstyle:membername", "checkstyle:parametername"}) + @SuppressFBWarnings(value = {"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"}) + public static class FFICompactionResult extends Struct { + public final Struct.size_t rows_read = new Struct.size_t(); + public final Struct.size_t rows_written = new Struct.size_t(); + + public FFICompactionResult(jnr.ffi.Runtime runtime) { + super(runtime); + } + } + + /** + * The interface for the native library we are calling. + */ + public interface Compaction { + FFICompactionResult allocate_result(); + + void free_result(@In FFICompactionResult res); + + @SuppressWarnings(value = "checkstyle:parametername") + int ffi_merge_sorted_files(@In FFICompactionParams input, @Out FFICompactionResult result); + } + + private RustBridge() { + } +} diff --git a/java/compaction/compaction-rust/src/main/java/sleeper/compaction/job/execution/RustCompaction.java b/java/compaction/compaction-rust/src/main/java/sleeper/compaction/job/execution/RustCompaction.java new file mode 100644 index 0000000000..5b433d77d2 --- /dev/null +++ b/java/compaction/compaction-rust/src/main/java/sleeper/compaction/job/execution/RustCompaction.java @@ -0,0 +1,229 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.job.execution; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionRunner; +import sleeper.compaction.job.execution.RustBridge.FFICompactionParams; +import sleeper.configuration.properties.table.TableProperties; +import sleeper.configuration.properties.table.TablePropertiesProvider; +import sleeper.core.range.Range; +import sleeper.core.range.Region; +import sleeper.core.record.process.RecordsProcessed; +import sleeper.core.schema.Schema; +import sleeper.core.schema.type.ByteArrayType; +import sleeper.core.schema.type.IntType; +import sleeper.core.schema.type.LongType; +import sleeper.core.schema.type.PrimitiveType; +import sleeper.core.schema.type.StringType; +import sleeper.core.statestore.StateStore; +import sleeper.statestore.StateStoreProvider; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; + +import static sleeper.configuration.properties.table.TableProperty.COLUMN_INDEX_TRUNCATE_LENGTH; +import static sleeper.configuration.properties.table.TableProperty.COMPRESSION_CODEC; +import static sleeper.configuration.properties.table.TableProperty.DICTIONARY_ENCODING_FOR_ROW_KEY_FIELDS; +import static sleeper.configuration.properties.table.TableProperty.DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS; +import static sleeper.configuration.properties.table.TableProperty.DICTIONARY_ENCODING_FOR_VALUE_FIELDS; +import static sleeper.configuration.properties.table.TableProperty.PAGE_SIZE; +import static sleeper.configuration.properties.table.TableProperty.PARQUET_WRITER_VERSION; +import static sleeper.configuration.properties.table.TableProperty.STATISTICS_TRUNCATE_LENGTH; + +public class RustCompaction implements CompactionRunner { + private final TablePropertiesProvider tablePropertiesProvider; + private final StateStoreProvider stateStoreProvider; + + /** Maximum number of rows in a Parquet row group. */ + private static final long RUST_MAX_ROW_GROUP_ROWS = 1_000_000; + + private static final Logger LOGGER = LoggerFactory.getLogger(RustCompaction.class); + + public RustCompaction(TablePropertiesProvider tablePropertiesProvider, + StateStoreProvider stateStoreProvider) { + this.tablePropertiesProvider = tablePropertiesProvider; + this.stateStoreProvider = stateStoreProvider; + } + + @Override + public RecordsProcessed compact(CompactionJob job) throws Exception { + TableProperties tableProperties = tablePropertiesProvider + .getById(job.getTableId()); + Schema schema = tableProperties.getSchema(); + StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); + Region region = stateStore.getAllPartitions().stream() + .filter(p -> Objects.equals(job.getPartitionId(), p.getId())) + .findFirst().orElseThrow(() -> new NoSuchElementException("Partition not found for compaction job")) + .getRegion(); + + // Obtain native library. This throws an exception if native library can't be + // loaded and linked + RustBridge.Compaction nativeLib = RustBridge.getRustCompactor(); + jnr.ffi.Runtime runtime = jnr.ffi.Runtime.getRuntime(nativeLib); + + FFICompactionParams params = createFFIParams(job, tableProperties, schema, region, runtime); + + RecordsProcessed result = invokeRustFFI(job, nativeLib, params); + + LOGGER.info("Compaction job {}: compaction finished at {}", job.getId(), + LocalDateTime.now()); + return result; + } + + /** + * Creates the input struct that contains all the information needed by the Rust + * side of the compaction. + * + * This includes all Parquet writer settings as well as compaction data such as + * input files, compaction + * region etc. + * + * @param job compaction job + * @param tableProperties table configuration for this table + * @param schema the table schema + * @param region region being compacted + * @param runtime FFI runtime + * @return object to pass to FFI layer + */ + @SuppressWarnings(value = "checkstyle:avoidNestedBlocks") + public static FFICompactionParams createFFIParams(CompactionJob job, TableProperties tableProperties, Schema schema, + Region region, jnr.ffi.Runtime runtime) { + FFICompactionParams params = new FFICompactionParams(runtime); + params.input_files.populate(job.getInputFiles().toArray(new String[0]), false); + params.output_file.set(job.getOutputFile()); + params.row_key_cols.populate(schema.getRowKeyFieldNames().toArray(new String[0]), false); + params.row_key_schema.populate(getKeyTypes(schema.getRowKeyTypes()), false); + params.sort_key_cols.populate(schema.getSortKeyFieldNames().toArray(new String[0]), false); + params.max_row_group_size.set(RUST_MAX_ROW_GROUP_ROWS); + params.max_page_size.set(tableProperties.getInt(PAGE_SIZE)); + params.compression.set(tableProperties.get(COMPRESSION_CODEC)); + params.writer_version.set(tableProperties.get(PARQUET_WRITER_VERSION)); + params.column_truncate_length.set(tableProperties.getInt(COLUMN_INDEX_TRUNCATE_LENGTH)); + params.stats_truncate_length.set(tableProperties.getInt(STATISTICS_TRUNCATE_LENGTH)); + params.dict_enc_row_keys.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_ROW_KEY_FIELDS)); + params.dict_enc_sort_keys.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS)); + params.dict_enc_values.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_VALUE_FIELDS)); + // Extra braces: Make sure wrong array isn't populated to wrong pointers + { + // This array can't contain nulls + Object[] regionMins = region.getRanges().stream().map(Range::getMin).toArray(); + params.region_mins.populate(regionMins, false); + } + { + Boolean[] regionMinInclusives = region.getRanges().stream().map(Range::isMinInclusive) + .toArray(Boolean[]::new); + params.region_mins_inclusive.populate(regionMinInclusives, false); + } + { + // This array can contain nulls + Object[] regionMaxs = region.getRanges().stream().map(Range::getMax).toArray(); + params.region_maxs.populate(regionMaxs, true); + } + { + Boolean[] regionMaxInclusives = region.getRanges().stream().map(Range::isMaxInclusive) + .toArray(Boolean[]::new); + params.region_maxs_inclusive.populate(regionMaxInclusives, false); + } + params.validate(); + return params; + } + + /** + * Convert a list of Sleeper primitive types to a number indicating their type + * for FFI translation. + * + * @param keyTypes list of primitive types of columns + * @return array of type IDs + * @throws IllegalStateException if unsupported type found + */ + public static Integer[] getKeyTypes(List keyTypes) { + return keyTypes.stream().mapToInt(type -> { + if (type instanceof IntType) { + return 1; + } else if (type instanceof LongType) { + return 2; + } else if (type instanceof StringType) { + return 3; + } else if (type instanceof ByteArrayType) { + return 4; + } else { + throw new IllegalStateException("Unsupported column type found " + type.getClass()); + } + }).boxed() + .toArray(Integer[]::new); + } + + /** + * Take the compaction parameters and invoke the Rust compactor using the FFI + * bridge. + * + * @param job the compaction job + * @param nativeLib the native library implement the FFI bridge + * @param compactionParams the compaction input parameters + * @return records read/written + * @throws IOException if the Rust library doesn't complete successfully + */ + public static RecordsProcessed invokeRustFFI(CompactionJob job, RustBridge.Compaction nativeLib, + FFICompactionParams compactionParams) throws IOException { + // Create object to hold the result (in native memory) + RustBridge.FFICompactionResult compactionData = nativeLib.allocate_result(); + try { + LOGGER.info("Invoking native Rust compaction..."); + + // Perform compaction + int result = nativeLib.ffi_merge_sorted_files(compactionParams, compactionData); + + // Check result + if (result != 0) { + LOGGER.error("Rust compaction failed, return code: {}", result); + throw new IOException("Rust compaction failed with return code " + result); + } + + long totalNumberOfRecordsRead = compactionData.rows_read.get(); + long recordsWritten = compactionData.rows_written.get(); + + LOGGER.info("Compaction job {}: Read {} records and wrote {} records", + job.getId(), totalNumberOfRecordsRead, recordsWritten); + + return new RecordsProcessed(totalNumberOfRecordsRead, recordsWritten); + } finally { + // Ensure de-allocation + nativeLib.free_result(compactionData); + } + } + + @Override + public String implementationLanguage() { + return "Rust"; + } + + @Override + public boolean isHardwareAccelerated() { + return false; + } + + @Override + public boolean supportsIterators() { + return false; + } +} diff --git a/java/compaction/pom.xml b/java/compaction/pom.xml index 5a678d8bcd..5c87e0b311 100644 --- a/java/compaction/pom.xml +++ b/java/compaction/pom.xml @@ -31,6 +31,7 @@ compaction-job-creation compaction-job-creation-lambda compaction-job-execution + compaction-rust compaction-status-store compaction-task-creation diff --git a/java/configuration/src/main/java/sleeper/configuration/properties/instance/CompactionProperty.java b/java/configuration/src/main/java/sleeper/configuration/properties/instance/CompactionProperty.java index b633f18110..ab26f073a7 100644 --- a/java/configuration/src/main/java/sleeper/configuration/properties/instance/CompactionProperty.java +++ b/java/configuration/src/main/java/sleeper/configuration/properties/instance/CompactionProperty.java @@ -251,6 +251,12 @@ public interface CompactionProperty { .validationPredicate(Utils::isPositiveInteger) .propertyGroup(InstancePropertyGroup.COMPACTION).build(); + UserDefinedInstanceProperty DEFAULT_COMPACTION_METHOD = Index.propertyBuilder("sleeper.default.table.compaction.method") + .description("Select what compaction method to use on a table. Current options are JAVA and RUST. Rust compaction support is " + + "experimental.") + .defaultValue("JAVA") + .propertyGroup(InstancePropertyGroup.COMPACTION).build(); + static List getAll() { return Index.INSTANCE.getAll(); } diff --git a/java/configuration/src/main/java/sleeper/configuration/properties/table/TableProperty.java b/java/configuration/src/main/java/sleeper/configuration/properties/table/TableProperty.java index 61b2b9e69e..a643fd5181 100644 --- a/java/configuration/src/main/java/sleeper/configuration/properties/table/TableProperty.java +++ b/java/configuration/src/main/java/sleeper/configuration/properties/table/TableProperty.java @@ -30,6 +30,7 @@ import static sleeper.configuration.Utils.describeEnumValuesInLowerCase; import static sleeper.configuration.properties.instance.CompactionProperty.DEFAULT_COMPACTION_FILES_BATCH_SIZE; import static sleeper.configuration.properties.instance.CompactionProperty.DEFAULT_COMPACTION_JOB_SEND_BATCH_SIZE; +import static sleeper.configuration.properties.instance.CompactionProperty.DEFAULT_COMPACTION_METHOD; import static sleeper.configuration.properties.instance.CompactionProperty.DEFAULT_COMPACTION_STRATEGY_CLASS; import static sleeper.configuration.properties.instance.CompactionProperty.DEFAULT_SIZERATIO_COMPACTION_STRATEGY_MAX_CONCURRENT_JOBS_PER_PARTITION; import static sleeper.configuration.properties.instance.CompactionProperty.DEFAULT_SIZERATIO_COMPACTION_STRATEGY_RATIO; @@ -247,6 +248,14 @@ public interface TableProperty extends SleeperProperty { "concurrently per partition.") .propertyGroup(TablePropertyGroup.COMPACTION) .build(); + + TableProperty COMPACTION_METHOD = Index.propertyBuilder("sleeper.table.compaction.method") + .defaultProperty(DEFAULT_COMPACTION_METHOD) + .description("Select what compaction method to use on a table. Current options are JAVA and RUST. Rust compaction support is " + + "experimental.") + .propertyGroup(TablePropertyGroup.COMPACTION) + .build(); + TableProperty STATESTORE_CLASSNAME = Index.propertyBuilder("sleeper.table.statestore.classname") .defaultValue("sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStore") .description("The name of the class used for the state store. " + diff --git a/java/pom.xml b/java/pom.xml index 7ae4139ac3..174e46e763 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -161,6 +161,8 @@ 1.5.2 1.14.0 2.12.7 + 2.2.16 + 2.5.0 5.10.3 1.10.3 @@ -196,6 +198,8 @@ 1.2.8 10.0.2 3.12.1 + 3.2.0 + 3.3.1