Skip to content

Commit

Permalink
[ZEPPELIN-6016] Rewrite and enable Livy integration tests (apache#4743)
Browse files Browse the repository at this point in the history
* wip

* nit

* nit

* wip

* wip

* fix

* [ZEPPELIN-5973] Bump Livy 0.8.0-incubating

* nit

* Spark 3.5.1

* test

* fix

* comment

* nit

* nit

* nit
  • Loading branch information
pan3793 authored Apr 23, 2024
1 parent 67098fd commit ca2481d
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 178 deletions.
23 changes: 15 additions & 8 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,11 @@ jobs:
rm -rf spark/interpreter/metastore_db
./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-3.5 -Pspark-scala-2.13 -Phadoop3 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS}
livy-0-7-with-spark-3-4-1-under-python3:
# The version combination is based on the facts:
# 1. official Livy 0.8 binary tarball is built against Spark 2.4
# 2. official Spark 2.4 binary tarball is built against Scala 2.11
# 3. Spark 2.4 support Python 2.7, 3.4 to 3.7
livy-0-8-with-spark-2-4-under-python37:
runs-on: ubuntu-20.04
steps:
- name: Checkout
Expand All @@ -449,14 +453,14 @@ jobs:
- name: install environment
run: |
./mvnw install -DskipTests -pl livy -am ${MAVEN_ARGS}
./testing/downloadSpark.sh "3.4.1" "3"
./testing/downloadLivy.sh "0.7.1-incubating"
- name: Setup conda environment with python 3.9 and R
./testing/downloadSpark.sh "2.4.8" "2.7"
./testing/downloadLivy.sh "0.8.0-incubating" "2.11"
- name: Setup conda environment with python 3.7 and R
uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: python_3_with_R
environment-file: testing/env_python_3_with_R.yml
python-version: 3.9
activate-environment: python_37_with_R
environment-file: testing/env_python_3.7_with_R.yml
python-version: 3.7
miniforge-variant: Mambaforge
channels: conda-forge,defaults
channel-priority: true
Expand All @@ -466,7 +470,10 @@ jobs:
run: |
R -e "IRkernel::installspec()"
- name: run tests
run: ./mvnw verify -pl livy -am ${MAVEN_ARGS}
run: |
export SPARK_HOME=$PWD/spark-2.4.8-bin-hadoop2.7
export LIVY_HOME=$PWD/apache-livy-0.8.0-incubating_2.11-bin
./mvnw verify -pl livy -am ${MAVEN_ARGS}
default-build:
runs-on: ubuntu-20.04
Expand Down
9 changes: 4 additions & 5 deletions livy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
Livy interpreter for Apache Zeppelin

# Prerequisities
You can follow the instructions at [Livy Quick Start](http://livy.io/quickstart.html) to set up livy.
You can follow the instructions at [Livy Get Started](https://livy.apache.org/get-started/) to set up livy.

# Run Integration Tests
You can add integration test to [LivyInterpreter.java](https://github.com/apache/zeppelin/blob/master/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java) and run the integration test either via the CI environment or locally. You need to download livy-0.2 and spark-1.5.2 to local, then use the following script to run the integration test.
You can add integration test to [LivyInterpreter.java](https://github.com/apache/zeppelin/blob/master/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java) and run the integration test either via the CI environment or locally. You need to download livy-0.8 and spark-2.4.8 to local, then use the following script to run the integration test.

```bash
#!/usr/bin/env bash
export LIVY_HOME=<path_of_livy_0.2.0>
export SPARK_HOME=<path_of_spark-1.5.2>
export LIVY_HOME=<path_of_livy_0.8.0>
export SPARK_HOME=<path_of_spark-2.4.8>
./mvnw clean verify -pl livy -DfailIfNoTests=false
```
116 changes: 4 additions & 112 deletions livy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,107 +37,9 @@
<commons.exec.version>1.3</commons.exec.version>
<spring.web.version>4.3.0.RELEASE</spring.web.version>
<spring.security.kerberosclient>1.0.1.RELEASE</spring.security.kerberosclient>

<!--test library versions-->
<livy.version>0.7.1-incubating</livy.version>
<spark.version>2.4.8</spark.version>
<hadoop.version>${hadoop3.3.version}</hadoop.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-integration-test</artifactId>
<version>${livy.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<artifactId>hadoop-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-tests</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-test-lib</artifactId>
<version>${livy.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
Expand Down Expand Up @@ -172,26 +74,16 @@
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<version>${hadoop.version}</version>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<version>${hadoop.version}</version>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
64 changes: 16 additions & 48 deletions livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.zeppelin.livy;

import org.apache.commons.io.IOUtils;
import org.apache.livy.test.framework.Cluster;
import org.apache.livy.test.framework.Cluster$;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
Expand All @@ -31,9 +29,7 @@
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,46 +44,23 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

@Disabled("FIXME: temporarily disable the broken tests")
public class LivyInterpreterIT {
private static final Logger LOGGER = LoggerFactory.getLogger(LivyInterpreterIT.class);
private static Cluster cluster;
public class LivyInterpreterIT extends WithLivyServer {
private static final Logger LOG = LoggerFactory.getLogger(LivyInterpreterIT.class);
private static Properties properties;

@BeforeAll
public static void setUp() {
public static void beforeAll() throws IOException {
if (!checkPreCondition()) {
return;
}
cluster = Cluster$.MODULE$.get();
LOGGER.info("Starting livy at {}", cluster.livyEndpoint());
WithLivyServer.beforeAll();
properties = new Properties();
properties.setProperty("zeppelin.livy.url", cluster.livyEndpoint());
properties.setProperty("zeppelin.livy.url", LIVY_ENDPOINT);
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "100");
properties.setProperty("zeppelin.livy.displayAppInfo", "false");
}

@AfterAll
public static void tearDown() {
if (cluster != null) {
LOGGER.info("Shutting down livy at {}", cluster.livyEndpoint());
cluster.cleanUp();
}
}

public static boolean checkPreCondition() {
if (System.getenv("LIVY_HOME") == null) {
LOGGER.warn(("livy integration is skipped because LIVY_HOME is not set"));
return false;
}
if (System.getenv("SPARK_HOME") == null) {
LOGGER.warn(("livy integration is skipped because SPARK_HOME is not set"));
return false;
}
return true;
}


@Test
void testSparkInterpreter() throws InterpreterException {
Expand Down Expand Up @@ -141,7 +114,6 @@ private void testRDD(final LivySparkInterpreter sparkInterpreter, boolean isSpar
.setAuthenticationInfo(authInfo)
.setInterpreterOut(output)
.build();
;

InterpreterResult result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
Expand Down Expand Up @@ -294,11 +266,10 @@ private void testDataFrame(LivySparkInterpreter sparkInterpreter,
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());

if (!isSpark2) {
assertTrue(result.message().get(0).getData().contains("Table not found"));
} else {
assertTrue(result.message().get(0).getData().contains("Table or view not found"));
}
String errMsg = result.message().get(0).getData();
assertTrue(errMsg.contains("Table not found") ||
errMsg.contains("Table or view not found") ||
errMsg.contains("TABLE_OR_VIEW_NOT_FOUND"));

// test sql cancel
if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
Expand Down Expand Up @@ -431,7 +402,7 @@ void testPySparkInterpreter() throws InterpreterException {
assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]")
|| result.message().get(0).getData().contains("[Row(_1='hello', _2=20)]"));
} else {
result = pysparkInterpreter.interpret("df=spark.createDataFrame([(\"hello\",20)])\n"
result = pysparkInterpreter.interpret("df=spark.createDataFrame([('hello',20)])\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(1, result.message().size());
Expand Down Expand Up @@ -485,15 +456,14 @@ public void run() {
}

@Test
void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation()
void testSparkInterpreterStringWithoutTruncation()
throws InterpreterException {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
Properties properties2 = new Properties(properties);
properties2.put("zeppelin.livy.displayAppInfo", "true");
// enable spark ui because it is disabled by livy integration test
properties2.put("livy.spark.ui.enabled", "true");
properties2.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false");
Expand All @@ -519,21 +489,19 @@ void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation()
try {
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(2, result.message().size());
// check yarn appId and ensure it is not null
assertTrue(result.message().get(1).getData().contains("Spark Application Id: application_"));
assertEquals(1, result.message().size(), result.toString());

// html output
String htmlCode = "println(\"%html <h1> hello </h1>\")";
result = sparkInterpreter.interpret(htmlCode, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(2, result.message().size());
assertEquals(1, result.message().size());
assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType());

// detect spark version
result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(2, result.message().size());
assertEquals(1, result.message().size());

boolean isSpark2 = isSpark2(sparkInterpreter, context);

Expand All @@ -552,7 +520,7 @@ void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation()
+ ".toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(2, result.message().size());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
Expand Down Expand Up @@ -673,7 +641,7 @@ void testLivyParams() throws InterpreterException {
try {
InterpreterResult result = sparkInterpreter.interpret("sc.version\n" +
"assert(sc.getConf.get(\"spark.executor.cores\") == \"4\" && " +
"sc.getConf.get(\"spark.app.name\") == \"zeppelin-livy\")"
"sc.getConf.get(\"spark.app.name\") == \"zeppelin-livy\")"
, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(1, result.message().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LivySQLInterpreterTest {
private LivySparkSQLInterpreter sqlInterpreter;

@BeforeEach
public void setUp() {
public void beforeEach() {
Properties properties = new Properties();
properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
Expand Down
Loading

0 comments on commit ca2481d

Please sign in to comment.