diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 0dd2dce91..97ef214cb 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -40,11 +40,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v2 with: - java-version: 11 - distribution: 'adopt' + java-version: 17 + distribution: 'temurin' - name: Install Protoc run: sudo apt install -y protobuf-compiler - uses: actions/cache@v4 diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 55e351ffb..6ffd81c94 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -36,11 +36,11 @@ jobs: uses: actions/checkout@v2 with: submodules: true - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v2 with: - java-version: 11 - distribution: 'adopt' + java-version: 17 + distribution: 'temurin' - name: Install Protoc run: sudo apt install -y protobuf-compiler - uses: actions/cache@v4 diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml index b2ebba156..b1bf026a9 100644 --- a/.github/workflows/documentation.yml +++ b/.github/workflows/documentation.yml @@ -36,11 +36,11 @@ jobs: API_KEY: $HOME/gems steps: - uses: actions/checkout@v2 - - name: Set up JDK 1.8 + - name: Set up JDK 17 uses: actions/setup-java@v2 with: - java-version: 8 - distribution: 'adopt' + java-version: 17 + distribution: 'temurin' - name: Install ruby run: | sudo apt install -y ruby-full build-essential zlib1g-dev diff --git a/README.md b/README.md index 6d80678f3..3e2eec588 100644 --- a/README.md +++ b/README.md @@ -81,11 +81,13 @@ source ~/.zshrc ### Requirements at Runtime -Since Apache Wayang (incubating) is not an execution engine itself but rather manages the execution engines for you, it is important to have the necessary requirements installed. +Apache Wayang (incubating) relies on external execution engines and Java to function correctly. Below are the updated runtime requirements: -- Apache Wayang supports Java versions 8 and above. However, the Wayang team recommends using Java version 11. Don’t forget to set the `JAVA_HOME` environment variable. -- You need to install Apache Spark version 3 or higher. Don’t forget to set the `SPARK_HOME` environment variable. -- You need to install Apache Hadoop version 3 or higher. Don’t forget to set the `HADOOP_HOME` environment variable. +- **Java 17**: Make sure `JAVA_HOME` is correctly set to your Java 17 installation. +- **Apache Spark 3.4.4**: Compatible with Scala 2.12. Set the `SPARK_HOME` environment variable. +- **Apache Hadoop 3+**: Set the `HADOOP_HOME` environment variable. + +> 🛠️ **Note:** When using Java 17, you _must_ add JVM flags to allow Wayang and Spark to access internal Java APIs, or you will encounter `IllegalAccessError`. See below. ### Validating the installation @@ -95,6 +97,26 @@ To execute your first application with Apache Wayang, you need to execute your p bin/wayang-submit org.apache.wayang.apps.wordcount.Main java file://$(pwd)/README.md ``` +### ⚙️ Java 17 Compatibility + +When running Wayang applications using Java 17 (especially with Spark), you must add JVM flags to open specific internal Java modules. These flags resolve access issues with `sun.nio.ch.DirectBuffer` and others. + +Update your `wayang-submit` (wayang-assembly/target/wayang-1.0.1-SNAPSHOT/bin/wayang-submit) script (or command) with: + +```bash +eval "$RUNNER \ + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED \ + --add-opens=java.base/java.nio=ALL-UNNAMED \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + --add-opens=java.base/java.util=ALL-UNNAMED \ + --add-opens=java.base/java.io=ALL-UNNAMED \ + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ + --add-opens=java.base/java.net=ALL-UNNAMED \ + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED \ + $FLAGS -cp \"${WAYANG_CLASSPATH}\" $CLASS ${ARGS}" +``` + ## Getting Started Wayang is available via Maven Central. To use it with Maven, include the following code snippet into your POM file: @@ -128,15 +150,17 @@ In addition, you can obtain the most recent snapshot version of Wayang via Sonat ``` ### Prerequisites -Apache Wayang (incubating) is built with Java 11 and Scala 2.12. However, to run Apache Wayang it is sufficient to have just Java 11 installed. Please also consider that processing platforms employed by Wayang might have further requirements. +Apache Wayang (incubating) is built with Java 17 and Scala 2.12. However, to run Apache Wayang it is sufficient to have just Java 17 installed. Please also consider that processing platforms employed by Wayang might have further requirements. ``` -Java 11 -[Scala 2.12] +Java 17 +Scala 2.12.17 +Spark 3.4.4, Compatible with Scala 2.12. +Maven ``` > **NOTE:** In windows, you need to define the variable `HADOOP_HOME` with the winutils.exe, an not official option to obtain [this repository](https://github.com/steveloughran/winutils), or you can generate your winutils.exe following the instructions in the repository. Also, you may need to install [msvcr100.dll](https://www.microsoft.com/en-us/download/details.aspx?id=26999) -> **NOTE:** Make sure that the JAVA_HOME environment variable is set correctly to Java 11 as the prerequisite checker script currently supports up to Java 11 and checks the latest version of Java if you have higher version installed. In Linux, it is preferably to use the export JAVA_HOME method inside the project folder. It is also recommended running './mvnw clean install' before opening the project using IntelliJ. +> **NOTE:** Make sure that the JAVA_HOME environment variable is set correctly to Java 17 as the prerequisite checker script currently supports up to Java 17 and checks the latest version of Java if you have higher version installed. In Linux, it is preferably to use the export JAVA_HOME method inside the project folder. It is also recommended running './mvnw clean install' before opening the project using IntelliJ. ### Building @@ -152,11 +176,24 @@ If you need to rebuild Wayang, e.g., to use a different Scala version, you can s ``` > **NOTE:** If you receive an error about not finding `MathExBaseVisitor`, then the problem might be that you are trying to build from IntelliJ, without Maven. MathExBaseVisitor is generated code, and a Maven build should generate it automatically. -> **NOTE:** In the current Maven setup, the version of scala is tied to the Java version, you can compile the profile `scala-11` with Java 8 and profile `scala-12` with Java 11. +> **NOTE:**: In the current Maven setup, Wayang supports Java 17. The default Scala version is 2.12.17, which is compatible with Java 17. Ensure that your Spark distribution is also built with Scala 2.12 (e.g., `spark-3.4.4-bin-hadoop3-scala2.12`). > **NOTE:** For compiling and testing the code it is required to have Hadoop installed on your machine. > **NOTE:** the `standalone` profile to fix Hadoop and Spark versions, so that Wayang apps do not explicitly need to declare the corresponding dependencies. + +> **NOTE**: When running applications (e.g., WordCount) with Java 17, you must pass additional flags to allow internal module access: + +>--add-exports=java.base/sun.nio.ch=ALL-UNNAMED \ +--add-opens=java.base/java.nio=ALL-UNNAMED \ +--add-opens=java.base/java.lang=ALL-UNNAMED \ +--add-opens=java.base/java.util=ALL-UNNAMED \ +--add-opens=java.base/java.io=ALL-UNNAMED \ +--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ +--add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ +--add-opens=java.base/java.net=ALL-UNNAMED \ +--add-opens=java.base/java.lang.invoke=ALL-UNNAMED \ + > > Also, note the `distro` profile, which assembles a binary Wayang distribution. To activate these profiles, you need to specify them when running maven, i.e., @@ -176,8 +213,8 @@ You can see examples on how to start using Wayang [here](guides/wayang-examples. ## Built With -* [Java 11](https://www.oracle.com/de/java/technologies/javase/jdk11-archive-downloads.html) -* [Scala 2.12](https://www.scala-lang.org/download/2.12.0.html) +* [Java 17](https://www.oracle.com/java/technologies/javase/17-0-14-relnotes.html) +* [Scala 2.12.17](https://www.scala-lang.org/download/2.12.17.html) * [Maven](https://maven.apache.org/) ## Contributing diff --git a/guides/tutorial.md b/guides/tutorial.md index 320b96169..0dfb914ff 100644 --- a/guides/tutorial.md +++ b/guides/tutorial.md @@ -63,6 +63,23 @@ To execute the WordCount example with Apache Wayang, you need to execute your pr cd wayang-1.0.1-SNAPSHOT ./bin/wayang-submit org.apache.wayang.apps.wordcount.Main java file://$(pwd)/README.md ``` +## If you're using Java 17, add the following JVM flags: +Update your `wayang-submit` (wayang-assembly/target/wayang-1.0.1-SNAPSHOT/bin/wayang-submit) script (or command) with: + +```shell +eval "$RUNNER \ + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED \ + --add-opens=java.base/java.nio=ALL-UNNAMED \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + --add-opens=java.base/java.util=ALL-UNNAMED \ + --add-opens=java.base/java.io=ALL-UNNAMED \ + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ + --add-opens=java.base/java.net=ALL-UNNAMED \ + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED \ + $FLAGS -cp \"${WAYANG_CLASSPATH}\" $CLASS ${ARGS}" +``` + Then you should be able to see outputs like this: ![img.png](../images/wordcount_result.png) diff --git a/pom.xml b/pom.xml index 54a7efee2..37bdfcfa3 100644 --- a/pom.xml +++ b/pom.xml @@ -102,33 +102,33 @@ 2025-02-04T20:09:34Z - 2.12.12 + 2.12.17 2.12 - 3.1.2 + 3.4.4 1.20.0 - 11 - 11 - 11 - 11 - 11 + 17 + 17 + 17 + 17 + 17 2.2.11 1.1.1 1.3.2 - 3.17.2 - 2.5 - 19.0 - 1.3 - 2.10.2 - 0.8.5 + 3.25.3 + 2.15.1 + 32.1.3-jre + 2.2 + 2.15.4 + 0.8.9 2.10.6 2.4.0 - 5.6.1 - 3.5.10 + 5.10.2 + 5.11.0 1.10.0 provided - 3.1.2 + 3.3.6 org.apache.wayang.default ${basedir}/ @@ -286,16 +286,16 @@ - java11 + java17 - 11 + 17 - 11 - 11 - 11 - 11 - 11 + 17 + 17 + 17 + 17 + 17 2.2.11 1.1.1 1.3.2 @@ -314,7 +314,7 @@ - 11 + 17 @@ -325,7 +325,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 11 + 17 false @@ -393,7 +393,7 @@ net.alchim31.maven scala-maven-plugin - 4.8.1 + 4.9.5 compile-scala @@ -447,9 +447,9 @@ - 2.12.12 + 2.12.17 2.12 - 3.1.2 + 3.4.4 @@ -555,7 +555,7 @@ - 4.9.1 + 4.13.1 @@ -775,7 +775,7 @@ org.apache.logging.log4j log4j-bom - 2.17.0 + 2.20.0 import pom @@ -959,8 +959,22 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.0.0 + + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/sun.reflect.annotation=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + false @@ -1242,7 +1256,7 @@ org.apache.maven.plugins maven-enforcer-plugin - 3.0.0 + 3.0.0-M3 enforce-maven-version @@ -1263,7 +1277,7 @@ org.jacoco jacoco-maven-plugin - 0.8.3 + 0.8.9 coverage-initialize diff --git a/src/main/script/prerequisiteCheck.groovy b/src/main/script/prerequisiteCheck.groovy index a076b649a..d3d561695 100644 --- a/src/main/script/prerequisiteCheck.groovy +++ b/src/main/script/prerequisiteCheck.groovy @@ -190,7 +190,7 @@ if (os == "windows") { // profiles. ///////////////////////////////////////////////////// -checkJavaVersion("1.8", "11") +checkJavaVersion("1.8", "17") // Check if hadoop is available // It seems that this is only required on Windows systems. diff --git a/wayang-api/wayang-api-json/pom.xml b/wayang-api/wayang-api-json/pom.xml index bb31dd916..064de548b 100644 --- a/wayang-api/wayang-api-json/pom.xml +++ b/wayang-api/wayang-api-json/pom.xml @@ -34,10 +34,10 @@ org.apache.wayang.api.json - 1.8 - 1.8 + 17 + 17 UTF-8 - 2.12.10 + 2.12.17 2.12 4.2.0 @@ -125,7 +125,7 @@ com.fasterxml.jackson.module jackson-module-scala_2.12 - 2.13.1 + 2.15.4 diff --git a/wayang-api/wayang-api-scala-java/pom.xml b/wayang-api/wayang-api-scala-java/pom.xml index 6f57caa35..7bcf9b60f 100644 --- a/wayang-api/wayang-api-scala-java/pom.xml +++ b/wayang-api/wayang-api-scala-java/pom.xml @@ -34,7 +34,7 @@ org.apache.wayang.api - 1.0.0-rc.2 + 1.0.0 @@ -131,7 +131,7 @@ io.netty netty-all - 4.1.45.Final + 4.1.89.Final com.google.protobuf @@ -141,6 +141,7 @@ com.fasterxml.jackson.core jackson-databind + 2.15.4 com.fasterxml.jackson.module diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala index 2514f09ee..da2babc66 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala @@ -35,8 +35,8 @@ import scala.language.implicitConversions import scala.reflect.ClassTag /** - * Provides implicits for the basic Wayang API. - */ + * Provides implicits for the basic Wayang API. + */ /** * TODO: add the documentation in the implicit of org.apache.wayang.api * labels: documentation,todo @@ -151,4 +151,4 @@ package object api { implicit def elevateRecordDataQuanta(dataQuanta: DataQuanta[Record]): RecordDataQuanta = new RecordDataQuanta(dataQuanta) -} +} \ No newline at end of file diff --git a/wayang-applications/pom.xml b/wayang-applications/pom.xml index 57477baff..f51482982 100644 --- a/wayang-applications/pom.xml +++ b/wayang-applications/pom.xml @@ -38,23 +38,23 @@ https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile ${project.basedir}/../wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1 - 3.5.0 - 11 - 11 + 3.4.4 + 17 + 17 UTF-8 - 3.17.2 - 2.5 - 19.0 - 1.3 - 2.10.2 - 0.8.5 + 3.25.3 + 2.15.1 + 32.1.3-jre + 2.2 + 2.15.4 + 0.8.9 2.10.6 2.4.0 - 5.6.1 - 3.5.10 + 5.10.2 + 5.11.0 @@ -92,12 +92,12 @@ org.scala-lang scala-library - 2.12.0 + 2.12.17 org.slf4j slf4j-simple - 1.7.13 + 2.0.6 diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml index ff0516aba..574dd3f3a 100644 --- a/wayang-benchmark/pom.xml +++ b/wayang-benchmark/pom.xml @@ -67,17 +67,17 @@ org.apache.hadoop hadoop-aws - 3.1.2 + 3.3.6 org.apache.hadoop hadoop-common - 3.2.4 + 3.3.6 org.apache.hadoop hadoop-client - 3.1.2 + 3.3.6 org.apache.wayang diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/pi/PiEstimation.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/pi/PiEstimation.java new file mode 100644 index 000000000..3f7b511f2 --- /dev/null +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/pi/PiEstimation.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.apps.pi; + + + +import org.apache.wayang.api.JavaPlanBuilder; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.java.Java; +import org.apache.wayang.spark.Spark; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +public class PiEstimation { + public static void main(String[] args) { + try { + if (args.length == 0) { + System.err.print("Usage: [,]* "); + System.exit(1); + } + + + + int slices = (args.length > 1) ? Integer.parseInt(args[1]) : 2; + int n = 100000 * slices; // Total points + + var wayangContext = new WayangContext(new Configuration()); + for (String platform : args[0].split(",")) { + switch (platform) { + case "java": + wayangContext.register(Java.basicPlugin()); + break; + case "spark": + wayangContext.register(Spark.basicPlugin()); + break; + default: + System.err.format("Unknown platform: \"%s\"\n", platform); + System.exit(3); + return; + } + } + + var planBuilder = new JavaPlanBuilder(wayangContext) + .withJobName("Wayang Pi Estimation"); + + // Create dataset of numbers from 0 to n + List numbers = new ArrayList<>(); + for (int i = 0; i < n; i++) { + numbers.add(i); + } + + // Wayang data pipeline + long count = planBuilder + .loadCollection(numbers) + .map(i -> { + double x = ThreadLocalRandom.current().nextDouble(-1, 1); + double y = ThreadLocalRandom.current().nextDouble(-1, 1); + return (x * x + y * y <= 1) ? 1 : 0; + }) + .reduce(Integer::sum) + .collect() + .iterator().next(); + + // Estimate Pi + double pi = 4.0 * count / n; + System.out.println("Pi is roughly: " + pi); + + } catch (NumberFormatException e) { + System.err.println("Invalid number format for slices. Please provide an integer."); + } catch (IllegalArgumentException e) { + System.err.println(e.getMessage()); + } catch (Exception e) { + System.err.println("Unexpected error occurred: " + e.getMessage()); + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/model/LogisticRegressionModel.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/model/LogisticRegressionModel.java new file mode 100644 index 000000000..fb1f961e3 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/model/LogisticRegressionModel.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.model; + + + + +public interface LogisticRegressionModel extends Model { + // No extra methods for now — predictions handled via SparkMLModel +} diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/LogisticRegressionOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/LogisticRegressionOperator.java new file mode 100644 index 000000000..9f07841a1 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/LogisticRegressionOperator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.operators; + + +import org.apache.wayang.basic.model.LogisticRegressionModel; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator; +import org.apache.wayang.core.plan.wayangplan.BinaryToUnaryOperator; +import org.apache.wayang.core.types.DataSetType; + +import java.util.Optional; + +public class LogisticRegressionOperator extends BinaryToUnaryOperator { + + protected boolean fitIntercept; + + public LogisticRegressionOperator(boolean fitIntercept) { + super( + DataSetType.createDefaultUnchecked(double[].class), + DataSetType.createDefaultUnchecked(Double.class), + DataSetType.createDefaultUnchecked(LogisticRegressionModel.class), + false + ); + this.fitIntercept = fitIntercept; + } + + public LogisticRegressionOperator(LogisticRegressionOperator that) { + super(that); + this.fitIntercept = that.fitIntercept; + } + + public boolean getFitIntercept() { + return this.fitIntercept; + } + + @Override + public Optional createCardinalityEstimator(int outputIndex, Configuration configuration) { + return super.createCardinalityEstimator(outputIndex, configuration); + } +} diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PredictOperators.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PredictOperators.java index 869d5f710..7a337c802 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PredictOperators.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PredictOperators.java @@ -35,4 +35,7 @@ public static PredictOperator linearRegression() { public static PredictOperator decisionTreeClassification() { return new PredictOperator<>(new TypeReference() {}, new TypeReference() {}); } + public static PredictOperator logisticRegression() { + return new PredictOperator<>(new TypeReference() {}, new TypeReference() {}); + } } diff --git a/wayang-commons/wayang-core/pom.xml b/wayang-commons/wayang-core/pom.xml index 348876dde..784a54187 100644 --- a/wayang-commons/wayang-core/pom.xml +++ b/wayang-commons/wayang-core/pom.xml @@ -84,7 +84,7 @@ org.antlr antlr4-runtime - 4.9.1 + 4.13.1 org.apache.logging.log4j @@ -99,9 +99,20 @@ wayang-utils-profile-db 1.0.1-SNAPSHOT + + com.fasterxml.jackson.core + jackson-core + 2.15.4 + com.fasterxml.jackson.core jackson-databind + 2.15.4 + + + com.fasterxml.jackson.core + jackson-annotations + 2.15.4 com.amazonaws diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/ExplainUtils.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/ExplainUtils.java index f9cfd4d1d..1a5bed5fd 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/ExplainUtils.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/ExplainUtils.java @@ -18,7 +18,7 @@ package org.apache.wayang.core.util; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.wayang.core.api.exception.WayangException; import org.apache.wayang.core.plan.executionplan.ExecutionPlan; import org.apache.wayang.core.plan.wayangplan.WayangPlan; diff --git a/wayang-commons/wayang-utils-profile-db/pom.xml b/wayang-commons/wayang-utils-profile-db/pom.xml index 47f3d76be..9368364bf 100644 --- a/wayang-commons/wayang-utils-profile-db/pom.xml +++ b/wayang-commons/wayang-utils-profile-db/pom.xml @@ -38,4 +38,5 @@ + diff --git a/wayang-ml4all/pom.xml b/wayang-ml4all/pom.xml index 17897da43..e0737f54c 100644 --- a/wayang-ml4all/pom.xml +++ b/wayang-ml4all/pom.xml @@ -77,7 +77,7 @@ org.apache.spark spark-core_2.12 - 3.3.3 + 3.4.4 org.apache.spark @@ -97,7 +97,7 @@ org.scala-lang scala-library - 2.12.12 + 2.12.17 diff --git a/wayang-platforms/wayang-java/pom.xml b/wayang-platforms/wayang-java/pom.xml index ae2c0c74d..a0405e971 100644 --- a/wayang-platforms/wayang-java/pom.xml +++ b/wayang-platforms/wayang-java/pom.xml @@ -71,18 +71,18 @@ org.slf4j slf4j-api - 1.7.36 + 2.0.6 org.apache.logging.log4j log4j-slf4j-impl - 2.17.2 + 2.20.0 org.mockito mockito-core - 4.5.1 + 5.11.0 test diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml index 7cb6a6a48..36a154c1d 100644 --- a/wayang-platforms/wayang-spark/pom.xml +++ b/wayang-platforms/wayang-spark/pom.xml @@ -59,17 +59,17 @@ org.xerial.snappy snappy-java 1.1.10.4 - ${external.platforms.scope} + compile org.apache.hadoop hadoop-common - 3.4.0 + 3.3.6 org.apache.hadoop hadoop-client - 2.7.7 + 3.3.6 org.apache.kafka @@ -79,7 +79,7 @@ org.apache.spark spark-core_2.12 - ${spark.version} + 3.4.4 org.xerial.snappy @@ -90,12 +90,17 @@ org.apache.spark spark-graphx_2.12 - ${spark.version} + 3.4.4 + + + org.apache.spark + spark-sql_2.12 + 3.4.4 org.apache.spark spark-mllib_2.12 - ${spark.version} + 3.4.4 @@ -108,7 +113,12 @@ com.fasterxml.jackson.core jackson-databind - 2.10.0 + ${jackson.version} + + + org.antlr + antlr4-runtime + 4.8 diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java index b884acf95..f22f2f3b5 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java @@ -73,6 +73,7 @@ public class Mappings { new LinearRegressionMapping(), new DecisionTreeClassificationMapping(), new ModelTransformMapping(), + new LogisticRegressionMapping(), new PredictMapping() ); diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ml/LogisticRegressionMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ml/LogisticRegressionMapping.java new file mode 100644 index 000000000..5a5d53af9 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ml/LogisticRegressionMapping.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.wayang.spark.mapping.ml; + +import org.apache.wayang.basic.operators.LogisticRegressionOperator; +import org.apache.wayang.core.mapping.*; +import org.apache.wayang.spark.operators.ml.SparkLogisticRegressionOperator; +import org.apache.wayang.spark.platform.SparkPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link LogisticRegressionOperator} to {@link SparkLogisticRegressionOperator}. + */ +@SuppressWarnings("unchecked") +public class LogisticRegressionMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + SparkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern( + "logisticRegression", new LogisticRegressionOperator(true), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new SparkLogisticRegressionOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java index 25212bef9..ce84f78d0 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java @@ -284,4 +284,4 @@ public List apply(scala.collection.Iterator iterator) { list.add(element); return list; } -} +} \ No newline at end of file diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java index 3169da7bb..614734063 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java @@ -230,4 +230,4 @@ public List apply(scala.collection.Iterator iterator) { } return list; } -} +} \ No newline at end of file diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/ml/SparkLogisticRegressionOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/ml/SparkLogisticRegressionOperator.java new file mode 100644 index 000000000..3d47362d0 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/ml/SparkLogisticRegressionOperator.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators.ml; + + + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.ml.classification.LogisticRegression; +import org.apache.spark.ml.classification.LogisticRegressionModel; +import org.apache.spark.ml.linalg.Vector; +import org.apache.spark.ml.linalg.VectorUDT; +import org.apache.spark.ml.linalg.Vectors; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.*; +import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.basic.operators.LogisticRegressionOperator; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; +import org.apache.wayang.spark.model.SparkMLModel; +import org.apache.wayang.spark.operators.SparkExecutionOperator; + +import java.util.*; + +public class SparkLogisticRegressionOperator extends LogisticRegressionOperator implements SparkExecutionOperator { + + private static final StructType SCHEMA = DataTypes.createStructType(new StructField[]{ + DataTypes.createStructField(Attr.LABEL, DataTypes.DoubleType, false), + DataTypes.createStructField(Attr.FEATURES, new VectorUDT(), false) + }); + + private static Dataset convertToDataFrame(JavaRDD features, JavaRDD labels) { + JavaPairRDD zipped = features.zip(labels); + JavaRDD rowRDD = zipped.map(e -> RowFactory.create(e._2, Vectors.dense(e._1))); + return SparkSession.builder().getOrCreate().createDataFrame(rowRDD, SCHEMA); + } + + public SparkLogisticRegressionOperator(boolean fitIntercept) { + super(fitIntercept); + } + + public SparkLogisticRegressionOperator(LogisticRegressionOperator that) { + super(that); + } + + @Override + public List getSupportedInputChannels(int index) { + return Arrays.asList(RddChannel.UNCACHED_DESCRIPTOR, RddChannel.CACHED_DESCRIPTOR); + } + + @Override + public List getSupportedOutputChannels(int index) { + return Collections.singletonList(CollectionChannel.DESCRIPTOR); + } + + @Override + public Tuple, Collection> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + + RddChannel.Instance featuresInput = (RddChannel.Instance) inputs[0]; + RddChannel.Instance labelsInput = (RddChannel.Instance) inputs[1]; + CollectionChannel.Instance output = (CollectionChannel.Instance) outputs[0]; + + JavaRDD features = featuresInput.provideRdd(); + JavaRDD labels = labelsInput.provideRdd(); + + Dataset df = convertToDataFrame(features, labels); + + LogisticRegression lr = new LogisticRegression() + .setFitIntercept(this.fitIntercept) + .setLabelCol(Attr.LABEL) + .setFeaturesCol(Attr.FEATURES) + .setPredictionCol(Attr.PREDICTION); + + LogisticRegressionModel model = lr.fit(df); + output.accept(Collections.singletonList(new Model(model))); + + return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); + } + + @Override + public boolean containsAction() { + return false; + } + + public static class Model implements org.apache.wayang.basic.model.LogisticRegressionModel, SparkMLModel { + + private static final StructType PREDICT_SCHEMA = DataTypes.createStructType(new StructField[]{ + DataTypes.createStructField(Attr.FEATURES, new VectorUDT(), false) + }); + + private final LogisticRegressionModel model; + + public Model(LogisticRegressionModel model) { + this.model = model; + } + + private static Dataset convertToDataFrame(JavaRDD features) { + JavaRDD rows = features.map(f -> RowFactory.create(Vectors.dense(f))); + return SparkSession.builder().getOrCreate().createDataFrame(rows, PREDICT_SCHEMA); + } + + @Override + public JavaRDD> transform(JavaRDD input) { + Dataset df = convertToDataFrame(input); + Dataset result = model.transform(df); + return result.toJavaRDD().map(row -> + new Tuple2<>(row.getAs(Attr.FEATURES), row.getAs(Attr.PREDICTION))); + } + + @Override + public JavaRDD predict(JavaRDD input) { + Dataset df = convertToDataFrame(input); + Dataset result = model.transform(df); + return result.toJavaRDD().map(row -> row.getAs(Attr.PREDICTION)); + } + } +} diff --git a/wayang-platforms/wayang-tensorflow/pom.xml b/wayang-platforms/wayang-tensorflow/pom.xml index 5d8be0968..c6faec028 100644 --- a/wayang-platforms/wayang-tensorflow/pom.xml +++ b/wayang-platforms/wayang-tensorflow/pom.xml @@ -34,9 +34,9 @@ - 11 - 11 - 1.0.0-rc.2 + 17 + 17 + 1.0.0 diff --git a/wayang-profiler/pom.xml b/wayang-profiler/pom.xml index bd44e442c..e32b07ada 100644 --- a/wayang-profiler/pom.xml +++ b/wayang-profiler/pom.xml @@ -94,5 +94,11 @@ spark-mllib_2.12 ${spark.version} + + org.apache.commons + commons-lang3 + 3.12.0 + + diff --git a/wayang-profiler/src/main/java/org/apache/wayang/profiler/hardware/DiskProfiler.java b/wayang-profiler/src/main/java/org/apache/wayang/profiler/hardware/DiskProfiler.java index f72b37da3..9373b6672 100644 --- a/wayang-profiler/src/main/java/org/apache/wayang/profiler/hardware/DiskProfiler.java +++ b/wayang-profiler/src/main/java/org/apache/wayang/profiler/hardware/DiskProfiler.java @@ -18,7 +18,7 @@ package org.apache.wayang.profiler.hardware; -import org.apache.commons.lang.Validate; +import org.apache.commons.lang3.Validate; import org.apache.wayang.core.util.Formats; import org.apache.wayang.core.util.fs.FileSystem; import org.apache.wayang.core.util.fs.FileSystems; diff --git a/wayang-tests-integration/pom.xml b/wayang-tests-integration/pom.xml index d498b19e0..d6509e345 100644 --- a/wayang-tests-integration/pom.xml +++ b/wayang-tests-integration/pom.xml @@ -91,17 +91,17 @@ org.apache.spark spark-core_2.12 - ${spark.version} + 3.4.4 org.apache.spark spark-graphx_2.12 - ${spark.version} + 3.4.4 org.apache.spark spark-mllib_2.12 - ${spark.version} + 3.4.4 @@ -224,13 +224,13 @@ org.codehaus.janino janino - 3.0.16 + 3.1.6 org.codehaus.janino commons-compiler - 3.0.16 + 3.1.6 @@ -263,6 +263,24 @@ org.apache.maven.plugins maven-failsafe-plugin + 2.22.2 + + + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED + --add-opens=java.base/sun.reflect.annotation=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + + false + @@ -273,10 +291,26 @@ + org.apache.maven.plugins maven-surefire-plugin + 3.0.0 + + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED + --add-opens=java.base/sun.reflect.annotation=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + false false diff --git a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/SparkIntegrationIT.java b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/SparkIntegrationIT.java index ccea3d9f0..ac89bfae9 100644 --- a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/SparkIntegrationIT.java +++ b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/SparkIntegrationIT.java @@ -449,6 +449,50 @@ public void testBroadcasts() { Assert.assertEquals(expectedValues, collectedValues); } + @Test + public void testLogisticRegressionOperator() { + CollectionSource xSource = new CollectionSource<>( + Arrays.asList( + new double[]{0.0, 1.0}, + new double[]{1.0, 0.0}, + new double[]{1.0, 1.0}, + new double[]{0.0, 0.0} + ), double[].class + ); + CollectionSource ySource = new CollectionSource<>( + Arrays.asList(1.0, 1.0, 0.0, 0.0), Double.class + ); + + xSource.addTargetPlatform(Spark.platform()); + ySource.addTargetPlatform(Spark.platform()); + + LogisticRegressionOperator train = new LogisticRegressionOperator(true); + PredictOperator predict = PredictOperators.logisticRegression(); + + List results = new ArrayList<>(); + LocalCallbackSink sink = LocalCallbackSink.createCollectingSink(results, DataSetType.createDefault(Double.class)); + + xSource.connectTo(0, train, 0); + ySource.connectTo(0, train, 1); + train.connectTo(0, predict, 0); + xSource.connectTo(0, predict, 1); + predict.connectTo(0, sink, 0); + + WayangPlan plan = new WayangPlan(sink); + + WayangContext context = new WayangContext() + .with(Spark.basicPlugin()) + .with(Spark.mlPlugin()); + + context.execute(plan); + + Assert.assertEquals(4, results.size()); + for (double pred : results) { + Assert.assertTrue(pred == 0.0 || pred == 1.0); + } + } + + @Test public void testKMeans() { CollectionSource collectionSource = new CollectionSource<>(