From 897f17c7e1a5a31a3ea96057656e5e33bb07a4de Mon Sep 17 00:00:00 2001 From: xristlamp Date: Thu, 20 Mar 2025 16:20:56 +0100 Subject: [PATCH 1/7] Updated project to Java 17 and PiEstimation test --- pom.xml | 28 +++--- src/main/script/prerequisiteCheck.groovy | 2 +- .../apache/wayang/apps/pi/PiEstimation.java | 94 +++++++++++++++++++ 3 files changed, 109 insertions(+), 15 deletions(-) create mode 100644 wayang-benchmark/src/main/java/org/apache/wayang/apps/pi/PiEstimation.java diff --git a/pom.xml b/pom.xml index 580323758..f89cb71e7 100644 --- a/pom.xml +++ b/pom.xml @@ -107,11 +107,11 @@ 3.1.2 1.20.0 - 11 - 11 - 11 - 11 - 11 + 17 + 17 + 17 + 17 + 17 2.2.11 1.1.1 1.3.2 @@ -286,16 +286,16 @@ - java11 + java17 - 11 + 17 - 11 - 11 - 11 - 11 - 11 + 17 + 17 + 17 + 17 + 17 2.2.11 1.1.1 1.3.2 @@ -314,7 +314,7 @@ - 11 + 17 @@ -325,7 +325,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 11 + 17 false diff --git a/src/main/script/prerequisiteCheck.groovy b/src/main/script/prerequisiteCheck.groovy index a076b649a..d3d561695 100644 --- a/src/main/script/prerequisiteCheck.groovy +++ b/src/main/script/prerequisiteCheck.groovy @@ -190,7 +190,7 @@ if (os == "windows") { // profiles. ///////////////////////////////////////////////////// -checkJavaVersion("1.8", "11") +checkJavaVersion("1.8", "17") // Check if hadoop is available // It seems that this is only required on Windows systems. diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/pi/PiEstimation.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/pi/PiEstimation.java new file mode 100644 index 000000000..3f7b511f2 --- /dev/null +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/pi/PiEstimation.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.apps.pi; + + + +import org.apache.wayang.api.JavaPlanBuilder; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.java.Java; +import org.apache.wayang.spark.Spark; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +public class PiEstimation { + public static void main(String[] args) { + try { + if (args.length == 0) { + System.err.print("Usage: [,]* "); + System.exit(1); + } + + + + int slices = (args.length > 1) ? Integer.parseInt(args[1]) : 2; + int n = 100000 * slices; // Total points + + var wayangContext = new WayangContext(new Configuration()); + for (String platform : args[0].split(",")) { + switch (platform) { + case "java": + wayangContext.register(Java.basicPlugin()); + break; + case "spark": + wayangContext.register(Spark.basicPlugin()); + break; + default: + System.err.format("Unknown platform: \"%s\"\n", platform); + System.exit(3); + return; + } + } + + var planBuilder = new JavaPlanBuilder(wayangContext) + .withJobName("Wayang Pi Estimation"); + + // Create dataset of numbers from 0 to n + List numbers = new ArrayList<>(); + for (int i = 0; i < n; i++) { + numbers.add(i); + } + + // Wayang data pipeline + long count = planBuilder + .loadCollection(numbers) + .map(i -> { + double x = ThreadLocalRandom.current().nextDouble(-1, 1); + double y = ThreadLocalRandom.current().nextDouble(-1, 1); + return (x * x + y * y <= 1) ? 1 : 0; + }) + .reduce(Integer::sum) + .collect() + .iterator().next(); + + // Estimate Pi + double pi = 4.0 * count / n; + System.out.println("Pi is roughly: " + pi); + + } catch (NumberFormatException e) { + System.err.println("Invalid number format for slices. Please provide an integer."); + } catch (IllegalArgumentException e) { + System.err.println(e.getMessage()); + } catch (Exception e) { + System.err.println("Unexpected error occurred: " + e.getMessage()); + e.printStackTrace(); + } + } +} \ No newline at end of file From 84f20c9680a65b715c5b20c924e3a9956f1f3230 Mon Sep 17 00:00:00 2001 From: xristlamp Date: Tue, 8 Apr 2025 16:48:55 +0200 Subject: [PATCH 2/7] Updated project to Java 17 mvn install -DskipTests -X works --- pom.xml | 51 ++++++++++++------- wayang-api/wayang-api-json/pom.xml | 8 +-- wayang-api/wayang-api-scala-java/pom.xml | 3 +- .../scala/org/apache/wayang/api/package.scala | 6 +-- wayang-applications/pom.xml | 24 ++++----- wayang-benchmark/pom.xml | 6 +-- wayang-commons/wayang-core/pom.xml | 13 ++++- .../apache/wayang/core/util/ExplainUtils.java | 2 +- .../wayang-utils-profile-db/pom.xml | 1 + wayang-ml4all/pom.xml | 4 +- wayang-platforms/wayang-java/pom.xml | 4 +- wayang-platforms/wayang-spark/pom.xml | 17 ++++--- .../SparkRandomPartitionSampleOperator.java | 2 +- .../SparkShufflePartitionSampleOperator.java | 2 +- wayang-platforms/wayang-tensorflow/pom.xml | 6 +-- wayang-profiler/pom.xml | 6 +++ .../profiler/hardware/DiskProfiler.java | 2 +- 17 files changed, 97 insertions(+), 60 deletions(-) diff --git a/pom.xml b/pom.xml index 8f722e9c3..3110072c3 100644 --- a/pom.xml +++ b/pom.xml @@ -102,9 +102,9 @@ 2025-02-04T20:09:34Z - 2.12.12 + 2.12.17 2.12 - 3.1.2 + 3.4.1 1.20.0 17 @@ -116,19 +116,19 @@ 1.1.1 1.3.2 - 3.17.2 - 2.5 - 19.0 - 1.3 - 2.10.2 - 0.8.5 + 3.25.3 + 2.15.1 + 32.1.3-jre + 2.2 + 2.15.4 + 0.8.9 2.10.6 2.4.0 - 5.6.1 - 3.5.10 + 5.10.2 + 5.11.0 1.10.0 provided - 3.1.2 + 3.3.6 org.apache.wayang.default ${basedir}/ @@ -393,7 +393,7 @@ net.alchim31.maven scala-maven-plugin - 4.8.1 + 4.9.5 compile-scala @@ -447,9 +447,9 @@ - 2.12.12 + 2.12.17 2.12 - 3.1.2 + 3.4.1 @@ -555,7 +555,7 @@ - 4.9.1 + 4.13.1 @@ -775,7 +775,7 @@ org.apache.logging.log4j log4j-bom - 2.17.0 + 2.20.0 import pom @@ -959,8 +959,21 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.0.0 + + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED + --add-opens=java.base/sun.reflect.annotation=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens java.base/java.net=ALL-UNNAMED + false @@ -1242,7 +1255,7 @@ org.apache.maven.plugins maven-enforcer-plugin - 3.0.0 + 3.0.0-M3 enforce-maven-version @@ -1263,7 +1276,7 @@ org.jacoco jacoco-maven-plugin - 0.8.3 + 0.8.9 coverage-initialize diff --git a/wayang-api/wayang-api-json/pom.xml b/wayang-api/wayang-api-json/pom.xml index bb31dd916..064de548b 100644 --- a/wayang-api/wayang-api-json/pom.xml +++ b/wayang-api/wayang-api-json/pom.xml @@ -34,10 +34,10 @@ org.apache.wayang.api.json - 1.8 - 1.8 + 17 + 17 UTF-8 - 2.12.10 + 2.12.17 2.12 4.2.0 @@ -125,7 +125,7 @@ com.fasterxml.jackson.module jackson-module-scala_2.12 - 2.13.1 + 2.15.4 diff --git a/wayang-api/wayang-api-scala-java/pom.xml b/wayang-api/wayang-api-scala-java/pom.xml index b4c206a10..9af47e67b 100644 --- a/wayang-api/wayang-api-scala-java/pom.xml +++ b/wayang-api/wayang-api-scala-java/pom.xml @@ -34,7 +34,7 @@ org.apache.wayang.api - 1.0.0-rc.2 + 1.0.0 @@ -141,6 +141,7 @@ com.fasterxml.jackson.core jackson-databind + 2.15.4 com.fasterxml.jackson.module diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala index 2514f09ee..da2babc66 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala @@ -35,8 +35,8 @@ import scala.language.implicitConversions import scala.reflect.ClassTag /** - * Provides implicits for the basic Wayang API. - */ + * Provides implicits for the basic Wayang API. + */ /** * TODO: add the documentation in the implicit of org.apache.wayang.api * labels: documentation,todo @@ -151,4 +151,4 @@ package object api { implicit def elevateRecordDataQuanta(dataQuanta: DataQuanta[Record]): RecordDataQuanta = new RecordDataQuanta(dataQuanta) -} +} \ No newline at end of file diff --git a/wayang-applications/pom.xml b/wayang-applications/pom.xml index 45db6b8ea..b5294b3ca 100644 --- a/wayang-applications/pom.xml +++ b/wayang-applications/pom.xml @@ -38,23 +38,23 @@ https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile ${project.basedir}/../wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1 - 3.5.0 - 11 - 11 + 3.4.1 + 17 + 17 UTF-8 - 3.17.2 - 2.5 - 19.0 - 1.3 - 2.10.2 - 0.8.5 + 3.25.3 + 2.15.1 + 32.1.3-jre + 2.2 + 2.15.4 + 0.8.9 2.10.6 2.4.0 - 5.6.1 - 3.5.10 + 5.10.2 + 5.11.0 @@ -92,7 +92,7 @@ org.scala-lang scala-library - 2.12.0 + 2.12.17 org.slf4j diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml index ff0516aba..574dd3f3a 100644 --- a/wayang-benchmark/pom.xml +++ b/wayang-benchmark/pom.xml @@ -67,17 +67,17 @@ org.apache.hadoop hadoop-aws - 3.1.2 + 3.3.6 org.apache.hadoop hadoop-common - 3.2.4 + 3.3.6 org.apache.hadoop hadoop-client - 3.1.2 + 3.3.6 org.apache.wayang diff --git a/wayang-commons/wayang-core/pom.xml b/wayang-commons/wayang-core/pom.xml index 348876dde..784a54187 100644 --- a/wayang-commons/wayang-core/pom.xml +++ b/wayang-commons/wayang-core/pom.xml @@ -84,7 +84,7 @@ org.antlr antlr4-runtime - 4.9.1 + 4.13.1 org.apache.logging.log4j @@ -99,9 +99,20 @@ wayang-utils-profile-db 1.0.1-SNAPSHOT + + com.fasterxml.jackson.core + jackson-core + 2.15.4 + com.fasterxml.jackson.core jackson-databind + 2.15.4 + + + com.fasterxml.jackson.core + jackson-annotations + 2.15.4 com.amazonaws diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/ExplainUtils.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/ExplainUtils.java index f9cfd4d1d..1a5bed5fd 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/ExplainUtils.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/ExplainUtils.java @@ -18,7 +18,7 @@ package org.apache.wayang.core.util; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.wayang.core.api.exception.WayangException; import org.apache.wayang.core.plan.executionplan.ExecutionPlan; import org.apache.wayang.core.plan.wayangplan.WayangPlan; diff --git a/wayang-commons/wayang-utils-profile-db/pom.xml b/wayang-commons/wayang-utils-profile-db/pom.xml index 47f3d76be..9368364bf 100644 --- a/wayang-commons/wayang-utils-profile-db/pom.xml +++ b/wayang-commons/wayang-utils-profile-db/pom.xml @@ -38,4 +38,5 @@ + diff --git a/wayang-ml4all/pom.xml b/wayang-ml4all/pom.xml index 17897da43..a07e0140c 100644 --- a/wayang-ml4all/pom.xml +++ b/wayang-ml4all/pom.xml @@ -77,7 +77,7 @@ org.apache.spark spark-core_2.12 - 3.3.3 + 3.4.1 org.apache.spark @@ -97,7 +97,7 @@ org.scala-lang scala-library - 2.12.12 + 2.12.17 diff --git a/wayang-platforms/wayang-java/pom.xml b/wayang-platforms/wayang-java/pom.xml index ccc80b438..a6274a2cf 100644 --- a/wayang-platforms/wayang-java/pom.xml +++ b/wayang-platforms/wayang-java/pom.xml @@ -76,13 +76,13 @@ org.apache.logging.log4j log4j-slf4j-impl - 2.17.2 + 2.20.0 org.mockito mockito-core - 4.5.1 + 5.11.0 test diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml index 7cb6a6a48..6d5931292 100644 --- a/wayang-platforms/wayang-spark/pom.xml +++ b/wayang-platforms/wayang-spark/pom.xml @@ -64,12 +64,12 @@ org.apache.hadoop hadoop-common - 3.4.0 + 3.3.6 org.apache.hadoop hadoop-client - 2.7.7 + 3.3.6 org.apache.kafka @@ -79,7 +79,7 @@ org.apache.spark spark-core_2.12 - ${spark.version} + 3.4.1 org.xerial.snappy @@ -90,12 +90,17 @@ org.apache.spark spark-graphx_2.12 - ${spark.version} + 3.4.1 + + + org.apache.spark + spark-sql_2.12 + 3.4.1 org.apache.spark spark-mllib_2.12 - ${spark.version} + 3.4.1 @@ -108,7 +113,7 @@ com.fasterxml.jackson.core jackson-databind - 2.10.0 + ${jackson.version} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java index 25212bef9..ce84f78d0 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRandomPartitionSampleOperator.java @@ -284,4 +284,4 @@ public List apply(scala.collection.Iterator iterator) { list.add(element); return list; } -} +} \ No newline at end of file diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java index 3169da7bb..614734063 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkShufflePartitionSampleOperator.java @@ -230,4 +230,4 @@ public List apply(scala.collection.Iterator iterator) { } return list; } -} +} \ No newline at end of file diff --git a/wayang-platforms/wayang-tensorflow/pom.xml b/wayang-platforms/wayang-tensorflow/pom.xml index 5d8be0968..c6faec028 100644 --- a/wayang-platforms/wayang-tensorflow/pom.xml +++ b/wayang-platforms/wayang-tensorflow/pom.xml @@ -34,9 +34,9 @@ - 11 - 11 - 1.0.0-rc.2 + 17 + 17 + 1.0.0 diff --git a/wayang-profiler/pom.xml b/wayang-profiler/pom.xml index bd44e442c..e32b07ada 100644 --- a/wayang-profiler/pom.xml +++ b/wayang-profiler/pom.xml @@ -94,5 +94,11 @@ spark-mllib_2.12 ${spark.version} + + org.apache.commons + commons-lang3 + 3.12.0 + + diff --git a/wayang-profiler/src/main/java/org/apache/wayang/profiler/hardware/DiskProfiler.java b/wayang-profiler/src/main/java/org/apache/wayang/profiler/hardware/DiskProfiler.java index f72b37da3..9373b6672 100644 --- a/wayang-profiler/src/main/java/org/apache/wayang/profiler/hardware/DiskProfiler.java +++ b/wayang-profiler/src/main/java/org/apache/wayang/profiler/hardware/DiskProfiler.java @@ -18,7 +18,7 @@ package org.apache.wayang.profiler.hardware; -import org.apache.commons.lang.Validate; +import org.apache.commons.lang3.Validate; import org.apache.wayang.core.util.Formats; import org.apache.wayang.core.util.fs.FileSystem; import org.apache.wayang.core.util.fs.FileSystems; From 96f1573c6e56ef4c2083fa9fb4ff62d9cc1ab0e4 Mon Sep 17 00:00:00 2001 From: xristlamp Date: Mon, 14 Apr 2025 11:31:58 +0200 Subject: [PATCH 3/7] Updated project to Java 17 with spark 3.4.4 and scala 2.12.17 --- pom.xml | 9 ++--- wayang-api/wayang-api-scala-java/pom.xml | 2 +- wayang-applications/pom.xml | 4 +-- wayang-ml4all/pom.xml | 2 +- wayang-platforms/wayang-java/pom.xml | 2 +- wayang-platforms/wayang-spark/pom.xml | 15 +++++--- wayang-tests-integration/pom.xml | 44 +++++++++++++++++++++--- 7 files changed, 59 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index 3110072c3..37bdfcfa3 100644 --- a/pom.xml +++ b/pom.xml @@ -104,7 +104,7 @@ 2.12.17 2.12 - 3.4.1 + 3.4.4 1.20.0 17 @@ -449,7 +449,7 @@ 2.12.17 2.12 - 3.4.1 + 3.4.4 @@ -963,16 +963,17 @@ --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED - --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/sun.reflect.annotation=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED - --add-opens java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED false diff --git a/wayang-api/wayang-api-scala-java/pom.xml b/wayang-api/wayang-api-scala-java/pom.xml index 9af47e67b..d137ff3ce 100644 --- a/wayang-api/wayang-api-scala-java/pom.xml +++ b/wayang-api/wayang-api-scala-java/pom.xml @@ -131,7 +131,7 @@ io.netty netty-all - 4.1.45.Final + 4.1.89.Final com.google.protobuf diff --git a/wayang-applications/pom.xml b/wayang-applications/pom.xml index b5294b3ca..723978905 100644 --- a/wayang-applications/pom.xml +++ b/wayang-applications/pom.xml @@ -38,7 +38,7 @@ https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile ${project.basedir}/../wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1 - 3.4.1 + 3.4.4 17 17 @@ -97,7 +97,7 @@ org.slf4j slf4j-simple - 1.7.13 + 2.0.6 diff --git a/wayang-ml4all/pom.xml b/wayang-ml4all/pom.xml index a07e0140c..e0737f54c 100644 --- a/wayang-ml4all/pom.xml +++ b/wayang-ml4all/pom.xml @@ -77,7 +77,7 @@ org.apache.spark spark-core_2.12 - 3.4.1 + 3.4.4 org.apache.spark diff --git a/wayang-platforms/wayang-java/pom.xml b/wayang-platforms/wayang-java/pom.xml index a6274a2cf..461bb75af 100644 --- a/wayang-platforms/wayang-java/pom.xml +++ b/wayang-platforms/wayang-java/pom.xml @@ -71,7 +71,7 @@ org.slf4j slf4j-api - 1.7.36 + 2.0.6 org.apache.logging.log4j diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml index 6d5931292..36a154c1d 100644 --- a/wayang-platforms/wayang-spark/pom.xml +++ b/wayang-platforms/wayang-spark/pom.xml @@ -59,7 +59,7 @@ org.xerial.snappy snappy-java 1.1.10.4 - ${external.platforms.scope} + compile org.apache.hadoop @@ -79,7 +79,7 @@ org.apache.spark spark-core_2.12 - 3.4.1 + 3.4.4 org.xerial.snappy @@ -90,17 +90,17 @@ org.apache.spark spark-graphx_2.12 - 3.4.1 + 3.4.4 org.apache.spark spark-sql_2.12 - 3.4.1 + 3.4.4 org.apache.spark spark-mllib_2.12 - 3.4.1 + 3.4.4 @@ -115,6 +115,11 @@ ${jackson.version} + + org.antlr + antlr4-runtime + 4.8 + diff --git a/wayang-tests-integration/pom.xml b/wayang-tests-integration/pom.xml index d498b19e0..d6509e345 100644 --- a/wayang-tests-integration/pom.xml +++ b/wayang-tests-integration/pom.xml @@ -91,17 +91,17 @@ org.apache.spark spark-core_2.12 - ${spark.version} + 3.4.4 org.apache.spark spark-graphx_2.12 - ${spark.version} + 3.4.4 org.apache.spark spark-mllib_2.12 - ${spark.version} + 3.4.4 @@ -224,13 +224,13 @@ org.codehaus.janino janino - 3.0.16 + 3.1.6 org.codehaus.janino commons-compiler - 3.0.16 + 3.1.6 @@ -263,6 +263,24 @@ org.apache.maven.plugins maven-failsafe-plugin + 2.22.2 + + + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED + --add-opens=java.base/sun.reflect.annotation=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + + false + @@ -273,10 +291,26 @@ + org.apache.maven.plugins maven-surefire-plugin + 3.0.0 + + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED + --add-opens=java.base/sun.reflect.annotation=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + false false From 602d2f8f903ad15269ce074af9419b22d42e00db Mon Sep 17 00:00:00 2001 From: xristlamp Date: Mon, 5 May 2025 16:09:18 +0200 Subject: [PATCH 4/7] Update README with Java 17 compatibility instructions --- README.md | 59 +++++++++++++++++++++++++++++++++++++--------- guides/tutorial.md | 15 ++++++++++++ 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 6d80678f3..3e2eec588 100644 --- a/README.md +++ b/README.md @@ -81,11 +81,13 @@ source ~/.zshrc ### Requirements at Runtime -Since Apache Wayang (incubating) is not an execution engine itself but rather manages the execution engines for you, it is important to have the necessary requirements installed. +Apache Wayang (incubating) relies on external execution engines and Java to function correctly. Below are the updated runtime requirements: -- Apache Wayang supports Java versions 8 and above. However, the Wayang team recommends using Java version 11. Don’t forget to set the `JAVA_HOME` environment variable. -- You need to install Apache Spark version 3 or higher. Don’t forget to set the `SPARK_HOME` environment variable. -- You need to install Apache Hadoop version 3 or higher. Don’t forget to set the `HADOOP_HOME` environment variable. +- **Java 17**: Make sure `JAVA_HOME` is correctly set to your Java 17 installation. +- **Apache Spark 3.4.4**: Compatible with Scala 2.12. Set the `SPARK_HOME` environment variable. +- **Apache Hadoop 3+**: Set the `HADOOP_HOME` environment variable. + +> 🛠️ **Note:** When using Java 17, you _must_ add JVM flags to allow Wayang and Spark to access internal Java APIs, or you will encounter `IllegalAccessError`. See below. ### Validating the installation @@ -95,6 +97,26 @@ To execute your first application with Apache Wayang, you need to execute your p bin/wayang-submit org.apache.wayang.apps.wordcount.Main java file://$(pwd)/README.md ``` +### ⚙️ Java 17 Compatibility + +When running Wayang applications using Java 17 (especially with Spark), you must add JVM flags to open specific internal Java modules. These flags resolve access issues with `sun.nio.ch.DirectBuffer` and others. + +Update your `wayang-submit` (wayang-assembly/target/wayang-1.0.1-SNAPSHOT/bin/wayang-submit) script (or command) with: + +```bash +eval "$RUNNER \ + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED \ + --add-opens=java.base/java.nio=ALL-UNNAMED \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + --add-opens=java.base/java.util=ALL-UNNAMED \ + --add-opens=java.base/java.io=ALL-UNNAMED \ + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ + --add-opens=java.base/java.net=ALL-UNNAMED \ + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED \ + $FLAGS -cp \"${WAYANG_CLASSPATH}\" $CLASS ${ARGS}" +``` + ## Getting Started Wayang is available via Maven Central. To use it with Maven, include the following code snippet into your POM file: @@ -128,15 +150,17 @@ In addition, you can obtain the most recent snapshot version of Wayang via Sonat ``` ### Prerequisites -Apache Wayang (incubating) is built with Java 11 and Scala 2.12. However, to run Apache Wayang it is sufficient to have just Java 11 installed. Please also consider that processing platforms employed by Wayang might have further requirements. +Apache Wayang (incubating) is built with Java 17 and Scala 2.12. However, to run Apache Wayang it is sufficient to have just Java 17 installed. Please also consider that processing platforms employed by Wayang might have further requirements. ``` -Java 11 -[Scala 2.12] +Java 17 +Scala 2.12.17 +Spark 3.4.4, Compatible with Scala 2.12. +Maven ``` > **NOTE:** In windows, you need to define the variable `HADOOP_HOME` with the winutils.exe, an not official option to obtain [this repository](https://github.com/steveloughran/winutils), or you can generate your winutils.exe following the instructions in the repository. Also, you may need to install [msvcr100.dll](https://www.microsoft.com/en-us/download/details.aspx?id=26999) -> **NOTE:** Make sure that the JAVA_HOME environment variable is set correctly to Java 11 as the prerequisite checker script currently supports up to Java 11 and checks the latest version of Java if you have higher version installed. In Linux, it is preferably to use the export JAVA_HOME method inside the project folder. It is also recommended running './mvnw clean install' before opening the project using IntelliJ. +> **NOTE:** Make sure that the JAVA_HOME environment variable is set correctly to Java 17 as the prerequisite checker script currently supports up to Java 17 and checks the latest version of Java if you have higher version installed. In Linux, it is preferably to use the export JAVA_HOME method inside the project folder. It is also recommended running './mvnw clean install' before opening the project using IntelliJ. ### Building @@ -152,11 +176,24 @@ If you need to rebuild Wayang, e.g., to use a different Scala version, you can s ``` > **NOTE:** If you receive an error about not finding `MathExBaseVisitor`, then the problem might be that you are trying to build from IntelliJ, without Maven. MathExBaseVisitor is generated code, and a Maven build should generate it automatically. -> **NOTE:** In the current Maven setup, the version of scala is tied to the Java version, you can compile the profile `scala-11` with Java 8 and profile `scala-12` with Java 11. +> **NOTE:**: In the current Maven setup, Wayang supports Java 17. The default Scala version is 2.12.17, which is compatible with Java 17. Ensure that your Spark distribution is also built with Scala 2.12 (e.g., `spark-3.4.4-bin-hadoop3-scala2.12`). > **NOTE:** For compiling and testing the code it is required to have Hadoop installed on your machine. > **NOTE:** the `standalone` profile to fix Hadoop and Spark versions, so that Wayang apps do not explicitly need to declare the corresponding dependencies. + +> **NOTE**: When running applications (e.g., WordCount) with Java 17, you must pass additional flags to allow internal module access: + +>--add-exports=java.base/sun.nio.ch=ALL-UNNAMED \ +--add-opens=java.base/java.nio=ALL-UNNAMED \ +--add-opens=java.base/java.lang=ALL-UNNAMED \ +--add-opens=java.base/java.util=ALL-UNNAMED \ +--add-opens=java.base/java.io=ALL-UNNAMED \ +--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ +--add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ +--add-opens=java.base/java.net=ALL-UNNAMED \ +--add-opens=java.base/java.lang.invoke=ALL-UNNAMED \ + > > Also, note the `distro` profile, which assembles a binary Wayang distribution. To activate these profiles, you need to specify them when running maven, i.e., @@ -176,8 +213,8 @@ You can see examples on how to start using Wayang [here](guides/wayang-examples. ## Built With -* [Java 11](https://www.oracle.com/de/java/technologies/javase/jdk11-archive-downloads.html) -* [Scala 2.12](https://www.scala-lang.org/download/2.12.0.html) +* [Java 17](https://www.oracle.com/java/technologies/javase/17-0-14-relnotes.html) +* [Scala 2.12.17](https://www.scala-lang.org/download/2.12.17.html) * [Maven](https://maven.apache.org/) ## Contributing diff --git a/guides/tutorial.md b/guides/tutorial.md index 320b96169..d7037cd88 100644 --- a/guides/tutorial.md +++ b/guides/tutorial.md @@ -63,6 +63,21 @@ To execute the WordCount example with Apache Wayang, you need to execute your pr cd wayang-1.0.1-SNAPSHOT ./bin/wayang-submit org.apache.wayang.apps.wordcount.Main java file://$(pwd)/README.md ``` +## If you're using Java 17, add the following JVM flags: +Update your `wayang-submit` (wayang-assembly/target/wayang-1.0.1-SNAPSHOT/bin/wayang-submit) script (or command) with: + +```shell +--add-exports=java.base/sun.nio.ch=ALL-UNNAMED \ +--add-opens=java.base/java.nio=ALL-UNNAMED \ +--add-opens=java.base/java.lang=ALL-UNNAMED \ +--add-opens=java.base/java.util=ALL-UNNAMED \ +--add-opens=java.base/java.io=ALL-UNNAMED \ +--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ +--add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ +--add-opens=java.base/java.net=ALL-UNNAMED \ +--add-opens=java.base/java.lang.invoke=ALL-UNNAMED +``` + Then you should be able to see outputs like this: ![img.png](../images/wordcount_result.png) From 0ce0670c4cef8889e514b910669dacd685db06ad Mon Sep 17 00:00:00 2001 From: xristlamp Date: Mon, 5 May 2025 16:10:54 +0200 Subject: [PATCH 5/7] Update README with Java 17 compatibility instructions --- guides/tutorial.md | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/guides/tutorial.md b/guides/tutorial.md index d7037cd88..0dfb914ff 100644 --- a/guides/tutorial.md +++ b/guides/tutorial.md @@ -67,15 +67,17 @@ cd wayang-1.0.1-SNAPSHOT Update your `wayang-submit` (wayang-assembly/target/wayang-1.0.1-SNAPSHOT/bin/wayang-submit) script (or command) with: ```shell ---add-exports=java.base/sun.nio.ch=ALL-UNNAMED \ ---add-opens=java.base/java.nio=ALL-UNNAMED \ ---add-opens=java.base/java.lang=ALL-UNNAMED \ ---add-opens=java.base/java.util=ALL-UNNAMED \ ---add-opens=java.base/java.io=ALL-UNNAMED \ ---add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ ---add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ ---add-opens=java.base/java.net=ALL-UNNAMED \ ---add-opens=java.base/java.lang.invoke=ALL-UNNAMED +eval "$RUNNER \ + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED \ + --add-opens=java.base/java.nio=ALL-UNNAMED \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + --add-opens=java.base/java.util=ALL-UNNAMED \ + --add-opens=java.base/java.io=ALL-UNNAMED \ + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ + --add-opens=java.base/java.net=ALL-UNNAMED \ + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED \ + $FLAGS -cp \"${WAYANG_CLASSPATH}\" $CLASS ${ARGS}" ``` Then you should be able to see outputs like this: From 08f8ded5ff67bd6eef93a1828e4f4c8fefcd5aa6 Mon Sep 17 00:00:00 2001 From: xristlamp Date: Tue, 6 May 2025 17:51:35 +0200 Subject: [PATCH 6/7] Add LogisticRegressionOperator --- .../basic/model/LogisticRegressionModel.java | 26 ++++ .../operators/LogisticRegressionOperator.java | 57 +++++++ .../basic/operators/PredictOperators.java | 3 + .../apache/wayang/spark/mapping/Mappings.java | 1 + .../mapping/ml/LogisticRegressionMapping.java | 57 +++++++ .../ml/SparkLogisticRegressionOperator.java | 144 ++++++++++++++++++ .../wayang/tests/SparkIntegrationIT.java | 44 ++++++ 7 files changed, 332 insertions(+) create mode 100644 wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/model/LogisticRegressionModel.java create mode 100644 wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/LogisticRegressionOperator.java create mode 100644 wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ml/LogisticRegressionMapping.java create mode 100644 wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/ml/SparkLogisticRegressionOperator.java diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/model/LogisticRegressionModel.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/model/LogisticRegressionModel.java new file mode 100644 index 000000000..fb1f961e3 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/model/LogisticRegressionModel.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.model; + + + + +public interface LogisticRegressionModel extends Model { + // No extra methods for now — predictions handled via SparkMLModel +} diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/LogisticRegressionOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/LogisticRegressionOperator.java new file mode 100644 index 000000000..9f07841a1 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/LogisticRegressionOperator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.operators; + + +import org.apache.wayang.basic.model.LogisticRegressionModel; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator; +import org.apache.wayang.core.plan.wayangplan.BinaryToUnaryOperator; +import org.apache.wayang.core.types.DataSetType; + +import java.util.Optional; + +public class LogisticRegressionOperator extends BinaryToUnaryOperator { + + protected boolean fitIntercept; + + public LogisticRegressionOperator(boolean fitIntercept) { + super( + DataSetType.createDefaultUnchecked(double[].class), + DataSetType.createDefaultUnchecked(Double.class), + DataSetType.createDefaultUnchecked(LogisticRegressionModel.class), + false + ); + this.fitIntercept = fitIntercept; + } + + public LogisticRegressionOperator(LogisticRegressionOperator that) { + super(that); + this.fitIntercept = that.fitIntercept; + } + + public boolean getFitIntercept() { + return this.fitIntercept; + } + + @Override + public Optional createCardinalityEstimator(int outputIndex, Configuration configuration) { + return super.createCardinalityEstimator(outputIndex, configuration); + } +} diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PredictOperators.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PredictOperators.java index 869d5f710..7a337c802 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PredictOperators.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PredictOperators.java @@ -35,4 +35,7 @@ public static PredictOperator linearRegression() { public static PredictOperator decisionTreeClassification() { return new PredictOperator<>(new TypeReference() {}, new TypeReference() {}); } + public static PredictOperator logisticRegression() { + return new PredictOperator<>(new TypeReference() {}, new TypeReference() {}); + } } diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java index b884acf95..f22f2f3b5 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java @@ -73,6 +73,7 @@ public class Mappings { new LinearRegressionMapping(), new DecisionTreeClassificationMapping(), new ModelTransformMapping(), + new LogisticRegressionMapping(), new PredictMapping() ); diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ml/LogisticRegressionMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ml/LogisticRegressionMapping.java new file mode 100644 index 000000000..5a5d53af9 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ml/LogisticRegressionMapping.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.wayang.spark.mapping.ml; + +import org.apache.wayang.basic.operators.LogisticRegressionOperator; +import org.apache.wayang.core.mapping.*; +import org.apache.wayang.spark.operators.ml.SparkLogisticRegressionOperator; +import org.apache.wayang.spark.platform.SparkPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link LogisticRegressionOperator} to {@link SparkLogisticRegressionOperator}. + */ +@SuppressWarnings("unchecked") +public class LogisticRegressionMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + SparkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern( + "logisticRegression", new LogisticRegressionOperator(true), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new SparkLogisticRegressionOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/ml/SparkLogisticRegressionOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/ml/SparkLogisticRegressionOperator.java new file mode 100644 index 000000000..3d47362d0 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/ml/SparkLogisticRegressionOperator.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators.ml; + + + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.ml.classification.LogisticRegression; +import org.apache.spark.ml.classification.LogisticRegressionModel; +import org.apache.spark.ml.linalg.Vector; +import org.apache.spark.ml.linalg.VectorUDT; +import org.apache.spark.ml.linalg.Vectors; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.*; +import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.basic.operators.LogisticRegressionOperator; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; +import org.apache.wayang.spark.model.SparkMLModel; +import org.apache.wayang.spark.operators.SparkExecutionOperator; + +import java.util.*; + +public class SparkLogisticRegressionOperator extends LogisticRegressionOperator implements SparkExecutionOperator { + + private static final StructType SCHEMA = DataTypes.createStructType(new StructField[]{ + DataTypes.createStructField(Attr.LABEL, DataTypes.DoubleType, false), + DataTypes.createStructField(Attr.FEATURES, new VectorUDT(), false) + }); + + private static Dataset convertToDataFrame(JavaRDD features, JavaRDD labels) { + JavaPairRDD zipped = features.zip(labels); + JavaRDD rowRDD = zipped.map(e -> RowFactory.create(e._2, Vectors.dense(e._1))); + return SparkSession.builder().getOrCreate().createDataFrame(rowRDD, SCHEMA); + } + + public SparkLogisticRegressionOperator(boolean fitIntercept) { + super(fitIntercept); + } + + public SparkLogisticRegressionOperator(LogisticRegressionOperator that) { + super(that); + } + + @Override + public List getSupportedInputChannels(int index) { + return Arrays.asList(RddChannel.UNCACHED_DESCRIPTOR, RddChannel.CACHED_DESCRIPTOR); + } + + @Override + public List getSupportedOutputChannels(int index) { + return Collections.singletonList(CollectionChannel.DESCRIPTOR); + } + + @Override + public Tuple, Collection> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + + RddChannel.Instance featuresInput = (RddChannel.Instance) inputs[0]; + RddChannel.Instance labelsInput = (RddChannel.Instance) inputs[1]; + CollectionChannel.Instance output = (CollectionChannel.Instance) outputs[0]; + + JavaRDD features = featuresInput.provideRdd(); + JavaRDD labels = labelsInput.provideRdd(); + + Dataset df = convertToDataFrame(features, labels); + + LogisticRegression lr = new LogisticRegression() + .setFitIntercept(this.fitIntercept) + .setLabelCol(Attr.LABEL) + .setFeaturesCol(Attr.FEATURES) + .setPredictionCol(Attr.PREDICTION); + + LogisticRegressionModel model = lr.fit(df); + output.accept(Collections.singletonList(new Model(model))); + + return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); + } + + @Override + public boolean containsAction() { + return false; + } + + public static class Model implements org.apache.wayang.basic.model.LogisticRegressionModel, SparkMLModel { + + private static final StructType PREDICT_SCHEMA = DataTypes.createStructType(new StructField[]{ + DataTypes.createStructField(Attr.FEATURES, new VectorUDT(), false) + }); + + private final LogisticRegressionModel model; + + public Model(LogisticRegressionModel model) { + this.model = model; + } + + private static Dataset convertToDataFrame(JavaRDD features) { + JavaRDD rows = features.map(f -> RowFactory.create(Vectors.dense(f))); + return SparkSession.builder().getOrCreate().createDataFrame(rows, PREDICT_SCHEMA); + } + + @Override + public JavaRDD> transform(JavaRDD input) { + Dataset df = convertToDataFrame(input); + Dataset result = model.transform(df); + return result.toJavaRDD().map(row -> + new Tuple2<>(row.getAs(Attr.FEATURES), row.getAs(Attr.PREDICTION))); + } + + @Override + public JavaRDD predict(JavaRDD input) { + Dataset df = convertToDataFrame(input); + Dataset result = model.transform(df); + return result.toJavaRDD().map(row -> row.getAs(Attr.PREDICTION)); + } + } +} diff --git a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/SparkIntegrationIT.java b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/SparkIntegrationIT.java index ccea3d9f0..ac89bfae9 100644 --- a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/SparkIntegrationIT.java +++ b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/SparkIntegrationIT.java @@ -449,6 +449,50 @@ public void testBroadcasts() { Assert.assertEquals(expectedValues, collectedValues); } + @Test + public void testLogisticRegressionOperator() { + CollectionSource xSource = new CollectionSource<>( + Arrays.asList( + new double[]{0.0, 1.0}, + new double[]{1.0, 0.0}, + new double[]{1.0, 1.0}, + new double[]{0.0, 0.0} + ), double[].class + ); + CollectionSource ySource = new CollectionSource<>( + Arrays.asList(1.0, 1.0, 0.0, 0.0), Double.class + ); + + xSource.addTargetPlatform(Spark.platform()); + ySource.addTargetPlatform(Spark.platform()); + + LogisticRegressionOperator train = new LogisticRegressionOperator(true); + PredictOperator predict = PredictOperators.logisticRegression(); + + List results = new ArrayList<>(); + LocalCallbackSink sink = LocalCallbackSink.createCollectingSink(results, DataSetType.createDefault(Double.class)); + + xSource.connectTo(0, train, 0); + ySource.connectTo(0, train, 1); + train.connectTo(0, predict, 0); + xSource.connectTo(0, predict, 1); + predict.connectTo(0, sink, 0); + + WayangPlan plan = new WayangPlan(sink); + + WayangContext context = new WayangContext() + .with(Spark.basicPlugin()) + .with(Spark.mlPlugin()); + + context.execute(plan); + + Assert.assertEquals(4, results.size()); + for (double pred : results) { + Assert.assertTrue(pred == 0.0 || pred == 1.0); + } + } + + @Test public void testKMeans() { CollectionSource collectionSource = new CollectionSource<>( From 368a695ceb1f31835fc7ff3e57d7852e3f9d8de8 Mon Sep 17 00:00:00 2001 From: xristlamp Date: Thu, 8 May 2025 12:51:09 +0200 Subject: [PATCH 7/7] Update the java version in the workflows --- .github/workflows/backend.yml | 6 +++--- .github/workflows/codeql.yml | 6 +++--- .github/workflows/documentation.yml | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 0dd2dce91..97ef214cb 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -40,11 +40,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v2 with: - java-version: 11 - distribution: 'adopt' + java-version: 17 + distribution: 'temurin' - name: Install Protoc run: sudo apt install -y protobuf-compiler - uses: actions/cache@v4 diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 55e351ffb..6ffd81c94 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -36,11 +36,11 @@ jobs: uses: actions/checkout@v2 with: submodules: true - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v2 with: - java-version: 11 - distribution: 'adopt' + java-version: 17 + distribution: 'temurin' - name: Install Protoc run: sudo apt install -y protobuf-compiler - uses: actions/cache@v4 diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml index b2ebba156..b1bf026a9 100644 --- a/.github/workflows/documentation.yml +++ b/.github/workflows/documentation.yml @@ -36,11 +36,11 @@ jobs: API_KEY: $HOME/gems steps: - uses: actions/checkout@v2 - - name: Set up JDK 1.8 + - name: Set up JDK 17 uses: actions/setup-java@v2 with: - java-version: 8 - distribution: 'adopt' + java-version: 17 + distribution: 'temurin' - name: Install ruby run: | sudo apt install -y ruby-full build-essential zlib1g-dev