diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index e441c823844..f004879e8bd 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -423,7 +423,11 @@ jobs: rm -rf spark/interpreter/metastore_db ./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-3.5 -Pspark-scala-2.13 -Phadoop3 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS} - livy-0-7-with-spark-3-4-1-under-python3: + # The version combination is based on the facts: + # 1. official Livy 0.8 binary tarball is built against Spark 2.4 + # 2. official Spark 2.4 binary tarball is built against Scala 2.11 + # 3. Spark 2.4 support Python 2.7, 3.4 to 3.7 + livy-0-8-with-spark-2-4-under-python37: runs-on: ubuntu-20.04 steps: - name: Checkout @@ -449,14 +453,14 @@ jobs: - name: install environment run: | ./mvnw install -DskipTests -pl livy -am ${MAVEN_ARGS} - ./testing/downloadSpark.sh "3.4.1" "3" - ./testing/downloadLivy.sh "0.7.1-incubating" - - name: Setup conda environment with python 3.9 and R + ./testing/downloadSpark.sh "2.4.8" "2.7" + ./testing/downloadLivy.sh "0.8.0-incubating" "2.11" + - name: Setup conda environment with python 3.7 and R uses: conda-incubator/setup-miniconda@v2 with: - activate-environment: python_3_with_R - environment-file: testing/env_python_3_with_R.yml - python-version: 3.9 + activate-environment: python_37_with_R + environment-file: testing/env_python_3.7_with_R.yml + python-version: 3.7 miniforge-variant: Mambaforge channels: conda-forge,defaults channel-priority: true @@ -466,7 +470,10 @@ jobs: run: | R -e "IRkernel::installspec()" - name: run tests - run: ./mvnw verify -pl livy -am ${MAVEN_ARGS} + run: | + export SPARK_HOME=$PWD/spark-2.4.8-bin-hadoop2.7 + export LIVY_HOME=$PWD/apache-livy-0.8.0-incubating_2.11-bin + ./mvnw verify -pl livy -am ${MAVEN_ARGS} default-build: runs-on: ubuntu-20.04 diff --git a/livy/README.md b/livy/README.md index 54311d93448..406af566ee9 100644 --- a/livy/README.md +++ b/livy/README.md @@ -2,14 +2,13 @@ Livy interpreter for Apache Zeppelin # Prerequisities -You can follow the instructions at [Livy Quick Start](http://livy.io/quickstart.html) to set up livy. +You can follow the instructions at [Livy Get Started](https://livy.apache.org/get-started/) to set up livy. # Run Integration Tests -You can add integration test to [LivyInterpreter.java](https://github.com/apache/zeppelin/blob/master/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java) and run the integration test either via the CI environment or locally. You need to download livy-0.2 and spark-1.5.2 to local, then use the following script to run the integration test. +You can add integration test to [LivyInterpreter.java](https://github.com/apache/zeppelin/blob/master/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java) and run the integration test either via the CI environment or locally. You need to download livy-0.8 and spark-2.4.8 to local, then use the following script to run the integration test. ```bash -#!/usr/bin/env bash -export LIVY_HOME= -export SPARK_HOME= +export LIVY_HOME= +export SPARK_HOME= ./mvnw clean verify -pl livy -DfailIfNoTests=false ``` diff --git a/livy/pom.xml b/livy/pom.xml index e004b8424eb..96344673a2d 100644 --- a/livy/pom.xml +++ b/livy/pom.xml @@ -37,107 +37,9 @@ 1.3 4.3.0.RELEASE 1.0.1.RELEASE - - - 0.7.1-incubating - 2.4.8 - ${hadoop3.3.version} - - org.apache.livy - livy-integration-test - ${livy.version} - test - - - org.xerial.snappy - snappy-java - - - org.apache.spark - spark-core_${scala.binary.version} - - - org.apache.spark - spark-sql_${scala.binary.version} - - - org.apache.spark - spark-streaming_${scala.binary.version} - - - org.apache.spark - spark-hive_${scala.binary.version} - - - org.apache.spark - spark-repl_${scala.binary.version} - - - org.apache.spark - spark-yarn_${scala.binary.version} - - - hadoop-client - org.apache.hadoop - - - hadoop-common - org.apache.hadoop - - - hadoop-hdfs - org.apache.hadoop - - - hadoop-yarn-client - org.apache.hadoop - - - hadoop-yarn-server-tests - org.apache.hadoop - - - - - org.apache.livy - livy-test-lib - ${livy.version} - test - - - org.xerial.snappy - snappy-java - - - org.apache.spark - spark-core_${scala.binary.version} - - - org.apache.spark - spark-sql_${scala.binary.version} - - - org.apache.spark - spark-streaming_${scala.binary.version} - - - org.apache.spark - spark-hive_${scala.binary.version} - - - org.apache.spark - spark-repl_${scala.binary.version} - - - org.apache.spark - spark-yarn_${scala.binary.version} - - - - org.apache.commons commons-exec @@ -172,26 +74,16 @@ - org.apache.hadoop - hadoop-client-api - ${hadoop.version} - test - - - - org.apache.hadoop - hadoop-client-runtime - ${hadoop.version} + org.awaitility + awaitility test - org.apache.hadoop - hadoop-client-minicluster - ${hadoop.version} + org.mockito + mockito-core test - diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index f5b09f69204..16c060011f3 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -18,8 +18,6 @@ package org.apache.zeppelin.livy; import org.apache.commons.io.IOUtils; -import org.apache.livy.test.framework.Cluster; -import org.apache.livy.test.framework.Cluster$; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -31,9 +29,7 @@ import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,46 +44,23 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -@Disabled("FIXME: temporarily disable the broken tests") -public class LivyInterpreterIT { - private static final Logger LOGGER = LoggerFactory.getLogger(LivyInterpreterIT.class); - private static Cluster cluster; +public class LivyInterpreterIT extends WithLivyServer { + private static final Logger LOG = LoggerFactory.getLogger(LivyInterpreterIT.class); private static Properties properties; @BeforeAll - public static void setUp() { + public static void beforeAll() throws IOException { if (!checkPreCondition()) { return; } - cluster = Cluster$.MODULE$.get(); - LOGGER.info("Starting livy at {}", cluster.livyEndpoint()); + WithLivyServer.beforeAll(); properties = new Properties(); - properties.setProperty("zeppelin.livy.url", cluster.livyEndpoint()); + properties.setProperty("zeppelin.livy.url", LIVY_ENDPOINT); properties.setProperty("zeppelin.livy.session.create_timeout", "120"); properties.setProperty("zeppelin.livy.spark.sql.maxResult", "100"); properties.setProperty("zeppelin.livy.displayAppInfo", "false"); } - @AfterAll - public static void tearDown() { - if (cluster != null) { - LOGGER.info("Shutting down livy at {}", cluster.livyEndpoint()); - cluster.cleanUp(); - } - } - - public static boolean checkPreCondition() { - if (System.getenv("LIVY_HOME") == null) { - LOGGER.warn(("livy integration is skipped because LIVY_HOME is not set")); - return false; - } - if (System.getenv("SPARK_HOME") == null) { - LOGGER.warn(("livy integration is skipped because SPARK_HOME is not set")); - return false; - } - return true; - } - @Test void testSparkInterpreter() throws InterpreterException { @@ -141,7 +114,6 @@ private void testRDD(final LivySparkInterpreter sparkInterpreter, boolean isSpar .setAuthenticationInfo(authInfo) .setInterpreterOut(output) .build(); - ; InterpreterResult result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString()); @@ -294,11 +266,10 @@ private void testDataFrame(LivySparkInterpreter sparkInterpreter, assertEquals(InterpreterResult.Code.ERROR, result.code()); assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); - if (!isSpark2) { - assertTrue(result.message().get(0).getData().contains("Table not found")); - } else { - assertTrue(result.message().get(0).getData().contains("Table or view not found")); - } + String errMsg = result.message().get(0).getData(); + assertTrue(errMsg.contains("Table not found") || + errMsg.contains("Table or view not found") || + errMsg.contains("TABLE_OR_VIEW_NOT_FOUND")); // test sql cancel if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) { @@ -431,7 +402,7 @@ void testPySparkInterpreter() throws InterpreterException { assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]") || result.message().get(0).getData().contains("[Row(_1='hello', _2=20)]")); } else { - result = pysparkInterpreter.interpret("df=spark.createDataFrame([(\"hello\",20)])\n" + result = pysparkInterpreter.interpret("df=spark.createDataFrame([('hello',20)])\n" + "df.collect()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString()); assertEquals(1, result.message().size()); @@ -485,7 +456,7 @@ public void run() { } @Test - void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation() + void testSparkInterpreterStringWithoutTruncation() throws InterpreterException { if (!checkPreCondition()) { return; @@ -493,7 +464,6 @@ void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation() InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); interpreterGroup.put("session_1", new ArrayList()); Properties properties2 = new Properties(properties); - properties2.put("zeppelin.livy.displayAppInfo", "true"); // enable spark ui because it is disabled by livy integration test properties2.put("livy.spark.ui.enabled", "true"); properties2.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false"); @@ -519,21 +489,19 @@ void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation() try { InterpreterResult result = sparkInterpreter.interpret("sc.version", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString()); - assertEquals(2, result.message().size()); - // check yarn appId and ensure it is not null - assertTrue(result.message().get(1).getData().contains("Spark Application Id: application_")); + assertEquals(1, result.message().size(), result.toString()); // html output String htmlCode = "println(\"%html

hello

\")"; result = sparkInterpreter.interpret(htmlCode, context); assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString()); - assertEquals(2, result.message().size()); + assertEquals(1, result.message().size()); assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); // detect spark version result = sparkInterpreter.interpret("sc.version", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString()); - assertEquals(2, result.message().size()); + assertEquals(1, result.message().size()); boolean isSpark2 = isSpark2(sparkInterpreter, context); @@ -552,7 +520,7 @@ void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation() + ".toDF(\"col_1\", \"col_2\")\n" + "df.collect()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString()); - assertEquals(2, result.message().size()); + assertEquals(1, result.message().size()); assertTrue(result.message().get(0).getData() .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); } @@ -673,7 +641,7 @@ void testLivyParams() throws InterpreterException { try { InterpreterResult result = sparkInterpreter.interpret("sc.version\n" + "assert(sc.getConf.get(\"spark.executor.cores\") == \"4\" && " + - "sc.getConf.get(\"spark.app.name\") == \"zeppelin-livy\")" + "sc.getConf.get(\"spark.app.name\") == \"zeppelin-livy\")" , context); assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString()); assertEquals(1, result.message().size()); diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java b/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java index 6294473371f..3461c096a45 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java @@ -35,7 +35,7 @@ class LivySQLInterpreterTest { private LivySparkSQLInterpreter sqlInterpreter; @BeforeEach - public void setUp() { + public void beforeEach() { Properties properties = new Properties(); properties.setProperty("zeppelin.livy.url", "http://localhost:8998"); properties.setProperty("zeppelin.livy.session.create_timeout", "120"); diff --git a/livy/src/test/java/org/apache/zeppelin/livy/WithLivyServer.java b/livy/src/test/java/org/apache/zeppelin/livy/WithLivyServer.java new file mode 100644 index 00000000000..d0a77e427ba --- /dev/null +++ b/livy/src/test/java/org/apache/zeppelin/livy/WithLivyServer.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.livy; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Optional; +import java.util.stream.Stream; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +abstract class WithLivyServer { + private static final Logger LOG = LoggerFactory.getLogger(WithLivyServer.class); + + private static Optional livy = Optional.empty(); + + protected static final String LIVY_HOME = System.getenv("LIVY_HOME"); + protected static final String SPARK_HOME = System.getenv("SPARK_HOME"); + + protected static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir"), "livy-it"); + protected static final File LIVY_CONF_DIR = new File(TMP_DIR, "conf"); + protected static final File LIVY_LOG_DIR = new File(TMP_DIR, "logs"); + + protected static String LIVY_ENDPOINT; + + public static boolean checkPreCondition() { + if (System.getenv("LIVY_HOME") == null) { + LOG.warn(("livy integration is skipped because LIVY_HOME is not set")); + return false; + } + if (System.getenv("SPARK_HOME") == null) { + LOG.warn(("livy integration is skipped because SPARK_HOME is not set")); + return false; + } + return true; + } + + @BeforeAll + public static void beforeAll() throws IOException { + if (!checkPreCondition()) { + return; + } + assertFalse(livy.isPresent()); + if (TMP_DIR.exists()) { + FileUtils.deleteQuietly(TMP_DIR); + } + assertTrue(LIVY_CONF_DIR.mkdirs()); + assertTrue(LIVY_LOG_DIR.mkdirs()); + Files.copy( + Paths.get(LIVY_HOME, "conf", "log4j.properties.template"), + LIVY_CONF_DIR.toPath().resolve("log4j.properties")); + + LOG.info("SPARK_HOME: {}", SPARK_HOME); + LOG.info("LIVY_HOME: {}", LIVY_HOME); + LOG.info("LIVY_CONF_DIR: {}", LIVY_CONF_DIR.getAbsolutePath()); + LOG.info("LIVY_LOG_DIR: {}", LIVY_LOG_DIR.getAbsolutePath()); + + File logFile = new File(TMP_DIR, "output.log"); + assertTrue(logFile.createNewFile()); + LOG.info("Redirect Livy's log to {}", logFile.getAbsolutePath()); + + ProcessBuilder pb = new ProcessBuilder(LIVY_HOME + "/bin/livy-server") + .directory(TMP_DIR) + .redirectErrorStream(true) + .redirectOutput(ProcessBuilder.Redirect.appendTo(logFile)); + + pb.environment().put("JAVA_HOME", System.getProperty("java.home")); + pb.environment().put("LIVY_CONF_DIR", LIVY_CONF_DIR.getAbsolutePath()); + pb.environment().put("LIVY_LOG_DIR", LIVY_LOG_DIR.getAbsolutePath()); + pb.environment().put("SPARK_LOCAL_IP", "127.0.0.1"); + Process livyProc = pb.start(); + + await().atMost(30, SECONDS).pollInterval(2, SECONDS).until(() -> { + try { + int exitCode = livyProc.exitValue(); + throw new IOException("Child process exited unexpectedly (exit code " + exitCode + ")"); + } catch (IllegalThreadStateException ignore) { + // Process does not exit, try again. + } + try (Stream lines = Files.lines(logFile.toPath(), StandardCharsets.UTF_8)) { + // An example of bootstrap log: + // 24/03/24 05:51:38 INFO WebServer: Starting server on http://cheng-pan-mbp.lan:8998 + Optional started = + lines.filter(line -> line.contains("Starting server on ")).findFirst(); + started.ifPresent(line -> + LIVY_ENDPOINT = StringUtils.substringAfter(line, "Starting server on ").trim()); + return started.isPresent(); + } + }); + + LOG.info("Livy Server is started at {}", LIVY_ENDPOINT); + livy = Optional.of(livyProc); + } + + @AfterAll + public static void afterAll() { + livy.filter(Process::isAlive).ifPresent(proc -> { + try { + LOG.info("Stopping the Livy Server running at {}", LIVY_ENDPOINT); + proc.destroy(); + if (!proc.waitFor(10, SECONDS)) { + LOG.warn("Forcibly stopping the Livy Server running at {}", LIVY_ENDPOINT); + proc.destroyForcibly(); + assertFalse(proc.isAlive()); + } + } catch (InterruptedException ignore) { + if (proc.isAlive()) { + LOG.warn("Forcibly stopping the Livy Server running at {}", LIVY_ENDPOINT); + proc.destroyForcibly(); + assertFalse(proc.isAlive()); + } + } + }); + } +} diff --git a/testing/downloadLivy.sh b/testing/downloadLivy.sh index 7f2faf3ffe0..f09837a7574 100755 --- a/testing/downloadLivy.sh +++ b/testing/downloadLivy.sh @@ -16,13 +16,21 @@ # limitations under the License. # -if [[ "$#" -ne 1 ]]; then - echo "usage) $0 [livy version]" - echo " eg) $0 0.2" +if [[ "$#" -ne 1 && "$#" -ne 2 ]]; then + echo "usage) $0 [scala version]" + echo " eg) $0 0.7.1-incubating" + echo " $0 0.8.0-incubating 2.11" exit 0 fi +# See simple version normalization: +# http://stackoverflow.com/questions/16989598/bash-comparing-version-numbers +function version { echo "$@" | awk -F. '{ printf("%03d%03d%03d\n", $1,$2,$3); }'; } LIVY_VERSION="${1}" +SCALA_VERSION_SUFFIX="" +if [ $(version $LIVY_VERSION) -ge $(version "0.8.0") ]; then + SCALA_VERSION_SUFFIX="_${2}" +fi set -xe @@ -49,7 +57,7 @@ download_with_retry() { } LIVY_CACHE=".livy-dist" -LIVY_ARCHIVE="apache-livy-${LIVY_VERSION}-bin" +LIVY_ARCHIVE="apache-livy-${LIVY_VERSION}${SCALA_VERSION_SUFFIX}-bin" export LIVY_HOME="${ZEPPELIN_HOME}/livy-server-$LIVY_VERSION" echo "LIVY_HOME is ${LIVY_HOME}"