Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(spark): add substrait-spark usage examples #293

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ trim_trailing_whitespace = true
[*.{yaml,yml}]
indent_size = 2

[{**/*.sql,**/OuterReferenceResolver.md,gradlew.bat}]
[{**/*.sql,**/OuterReferenceResolver.md,**gradlew.bat}]
charset = unset
end_of_line = unset
insert_final_newline = unset
Expand Down
20 changes: 20 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ jobs:
uses: gradle/actions/setup-gradle@v3
- name: Build with Gradle
run: gradle build --rerun-tasks
examples:
name: Build Examples
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
- uses: extractions/setup-just@v2
- name: substrait-spark
shell: bash
run: |
pwd
ls -lart
just -f ./examples/substrait-spark/justfile buildapp

isthmus-native-image-mac-linux:
name: Build Isthmus Native Image
needs: java
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ gen
out/**
*.iws
.vscode
.pmdCache

*/bin
5 changes: 5 additions & 0 deletions examples/substrait-spark/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
spark-warehouse
derby.log
_apps
_data
bin
569 changes: 569 additions & 0 deletions examples/substrait-spark/README.md

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions examples/substrait-spark/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id("java")
id("com.diffplug.spotless") version "6.19.0"
}

repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}

dependencies {
implementation("org.apache.spark:spark-core_2.12:3.5.1")
implementation("io.substrait:spark:0.36.0")
implementation("io.substrait:core:0.36.0")
implementation("org.apache.spark:spark-sql_2.12:3.5.1")

// For a real Spark application, these would not be required since they would be in the Spark
// server classpath
runtimeOnly("org.apache.spark:spark-core_2.12:3.5.1")
runtimeOnly("org.apache.spark:spark-hive_2.12:3.5.1")
}

tasks.jar {
isZip64 = true
exclude("META-INF/*.RSA")
exclude("META-INF/*.SF")
exclude("META-INF/*.DSA")

duplicatesStrategy = DuplicatesStrategy.EXCLUDE
manifest.attributes["Main-Class"] = "io.substrait.examples.App"
from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) }))
}

tasks.named<Test>("test") {
// Use JUnit Platform for unit tests.
useJUnitPlatform()
}

java { toolchain { languageVersion.set(JavaLanguageVersion.of(17)) } }
32 changes: 32 additions & 0 deletions examples/substrait-spark/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
services:
spark:
image: docker.io/bitnami/spark:3.5
user: ":${MY_GID}"
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
ports:
- '8080:8080'
volumes:
- ./_apps:/opt/spark-apps
- ./_data:/opt/spark-data
spark-worker:
image: docker.io/bitnami/spark:3.5
user: ":${MY_GID}"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
volumes:
- ./_apps:/opt/spark-apps
- ./_data:/opt/spark-data
60 changes: 60 additions & 0 deletions examples/substrait-spark/justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Main justfile to run all the development scripts
# To install 'just' see https://github.com/casey/just#installation

# Ensure all properties are exported as shell env-vars
set export
set dotenv-load

# set the current directory, and the location of the test dats
CWDIR := justfile_directory()

SPARK_VERSION := "3.5.1"

SPARK_MASTER_CONTAINER := "substrait-spark-spark-1"

_default:
@just -f {{justfile()}} --list

# Builds the application into a JAR file
buildapp:
#!/bin/bash
set -e -o pipefail

${CWDIR}/../../gradlew build

# need to let the SPARK user be able to write to the _data mount
mkdir -p ${CWDIR}/_data && chmod g+w ${CWDIR}/_data
mkdir -p ${CWDIR}/_apps

cp ${CWDIR}/build/libs/substrait-spark*.jar ${CWDIR}/_apps/app.jar
cp ${CWDIR}/src/main/resources/*.csv ${CWDIR}/_data

# Runs a Spark dataset api query and produces a Substrait plan
dataset:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkDataset"

# Runs a Spark SQL api query and produces a Substrait plan
sql:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkSQL"

# Consumes the Substrait plan file passed as the argument
consume arg:
#!/bin/bash
set -e -o pipefail

docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkConsumeSubstrait {{arg}}"

# Starts a simple Spark cluster locally in docker
spark:
#!/bin/bash
set -e -o pipefail

export MY_UID=$(id -u)
export MY_GID=$(id -g)
docker compose up
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.substrait.examples;

/** Main class */
public final class App {

/** Implemented by all examples */
public interface Action {

/**
* Run
*
* @param arg argument
*/
void run(String arg);
}

private App() {}

/**
* Traditional main method
*
* @param args string[]
*/
public static void main(String args[]) {
try {

if (args.length == 0) {
args = new String[] {"SparkDataset"};
}
String exampleClass = args[0];

var clz = Class.forName(App.class.getPackageName() + "." + exampleClass);
var action = (Action) clz.getDeclaredConstructor().newInstance();

if (args.length == 2) {
action.run(args[1]);
} else {
action.run(null);
}

} catch (Exception e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.substrait.examples;

import static io.substrait.examples.SparkHelper.ROOT_DIR;

import io.substrait.examples.util.SubstraitStringify;
import io.substrait.plan.Plan;
import io.substrait.plan.ProtoPlanConverter;
import io.substrait.spark.logical.ToLogicalPlan;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;

/** Minimal Spark application */
public class SparkConsumeSubstrait implements App.Action {

@Override
public void run(String arg) {

// Connect to a local in-process Spark instance
try (SparkSession spark = SparkHelper.connectLocalSpark()) {

System.out.println("Reading from " + arg);
byte[] buffer = Files.readAllBytes(Paths.get(ROOT_DIR, arg));

io.substrait.proto.Plan proto = io.substrait.proto.Plan.parseFrom(buffer);
ProtoPlanConverter protoToPlan = new ProtoPlanConverter();
Plan plan = protoToPlan.from(proto);

SubstraitStringify.explain(plan).forEach(System.out::println);

ToLogicalPlan substraitConverter = new ToLogicalPlan(spark);
LogicalPlan sparkPlan = substraitConverter.convert(plan);

System.out.println(sparkPlan);

Dataset.ofRows(spark, sparkPlan).show();

spark.stop();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.substrait.examples;

import static io.substrait.examples.SparkHelper.ROOT_DIR;
import static io.substrait.examples.SparkHelper.TESTS_CSV;
import static io.substrait.examples.SparkHelper.VEHICLES_CSV;

import io.substrait.examples.util.SubstraitStringify;
import io.substrait.plan.PlanProtoConverter;
import io.substrait.spark.logical.ToSubstraitRel;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;

/** Minimal Spark application */
public class SparkDataset implements App.Action {

@Override
public void run(String arg) {

// Connect to a local in-process Spark instance
try (SparkSession spark = SparkHelper.connectLocalSpark()) {

Dataset<Row> dsVehicles;
Dataset<Row> dsTests;

// load from CSV files
String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString();
String testsFile = Paths.get(ROOT_DIR, TESTS_CSV).toString();

System.out.println("Reading " + vehiclesFile);
System.out.println("Reading " + testsFile);

dsVehicles = spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile);
dsVehicles.show();

dsTests = spark.read().option("delimiter", ",").option("header", "true").csv(testsFile);
dsTests.show();

// created the joined dataset
Dataset<Row> joinedDs =
dsVehicles
.join(dsTests, dsVehicles.col("vehicle_id").equalTo(dsTests.col("vehicle_id")))
.filter(dsTests.col("test_result").equalTo("P"))
.groupBy(dsVehicles.col("colour"))
.count();

joinedDs = joinedDs.orderBy(joinedDs.col("count"));
joinedDs.show();

LogicalPlan plan = joinedDs.queryExecution().optimizedPlan();

System.out.println(plan);
createSubstrait(plan);

spark.stop();
} catch (Exception e) {
e.printStackTrace(System.out);
}
}

/**
* Create substrait plan and save to file based on logical plan
*
* @param enginePlan logical plan
*/
public void createSubstrait(LogicalPlan enginePlan) {
ToSubstraitRel toSubstrait = new ToSubstraitRel();
io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan);

SubstraitStringify.explain(plan).forEach(System.out::println);

PlanProtoConverter planToProto = new PlanProtoConverter();
byte[] buffer = planToProto.toProto(plan).toByteArray();
try {
Files.write(Paths.get(ROOT_DIR, "spark_dataset_substrait.plan"), buffer);
System.out.println("File written to " + Paths.get(ROOT_DIR, "spark_sql_substrait.plan"));
} catch (IOException e) {
e.printStackTrace(System.out);
}
}
}
Loading
Loading