Skip to content

Commit

Permalink
[test](itcase) Add itcase for connector (apache#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Dec 28, 2023
1 parent c3ad3c3 commit 6f5f5a5
Show file tree
Hide file tree
Showing 6 changed files with 513 additions and 0 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/run-itcase-12.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
---
name: Run ITCases 1.2
on:
pull_request:
push:

jobs:
build-extension:
name: "Run ITCases 1.2"
runs-on: ubuntu-latest
defaults:
run:
shell: bash
steps:
- name: Checkout
uses: actions/checkout@master

- name: Setup java
uses: actions/setup-java@v2
with:
distribution: adopt
java-version: '8'

- name: Run ITCases
run: |
cd spark-doris-connector && mvn test -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86"
44 changes: 44 additions & 0 deletions .github/workflows/run-itcase-20.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
---
name: Run ITCases 2.0
on:
pull_request:
push:

jobs:
build-extension:
name: "Run ITCases 2.0"
runs-on: ubuntu-latest
defaults:
run:
shell: bash
steps:
- name: Checkout
uses: actions/checkout@master

- name: Setup java
uses: actions/setup-java@v2
with:
distribution: adopt
java-version: '8'

- name: Run ITCases
run: |
cd spark-doris-connector && mvn test -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3"
13 changes: 13 additions & 0 deletions spark-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
<netty.version>4.1.77.Final</netty.version>
<fasterxml.jackson.version>2.10.5</fasterxml.jackson.version>
<thrift-service.version>1.0.0</thrift-service.version>
<testcontainers.version>1.17.6</testcontainers.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -199,6 +200,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark;

import com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.given;
import static org.awaitility.Durations.ONE_SECOND;

public abstract class DorisTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image");
private static final String DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
protected static final String URL = "jdbc:mysql://%s:9030";
protected static final String USERNAME = "root";
public static final String PASSWORD = "";
protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
protected static Connection connection;

protected static String getFenodes() {
return DORIS_CONTAINER.getHost() + ":8030";
}

@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
given().ignoreExceptions()
.await()
.atMost(300, TimeUnit.SECONDS)
.pollInterval(ONE_SECOND)
.untilAsserted(DorisTestBase::initializeJdbcConnection);
LOG.info("Containers are started.");
}

@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
DORIS_CONTAINER.stop();
LOG.info("Containers are stopped.");
}

public static GenericContainer createDorisContainer() {
GenericContainer container =
new GenericContainer<>(DORIS_DOCKER_IMAGE)
.withNetwork(Network.newNetwork())
.withNetworkAliases("DorisContainer")
.withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
.withEnv("FE_ID", "1")
.withEnv("CURRENT_BE_IP", "127.0.0.1")
.withEnv("CURRENT_BE_PORT", "9050")
.withCommand("ulimit -n 65536")
.withCreateContainerCmdModifier(
cmd -> cmd.getHostConfig().withMemorySwap(0L))
.withPrivilegedMode(true)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));

container.setPortBindings(
Lists.newArrayList(
String.format("%s:%s", "8030", "8030"),
String.format("%s:%s", "9030", "9030"),
String.format("%s:%s", "9060", "9060"),
String.format("%s:%s", "8040", "8040")));

return container;
}

protected static void initializeJdbcConnection() throws SQLException, MalformedURLException {
URLClassLoader urlClassLoader =
new URLClassLoader(
new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
LOG.info("Try to connect to Doris...");
Thread.currentThread().setContextClassLoader(urlClassLoader);
connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
try (Statement statement = connection.createStatement()) {
ResultSet resultSet;
do {
LOG.info("Wait for the Backend to start successfully...");
resultSet = statement.executeQuery("show backends");
} while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
}
LOG.info("Connected to Doris successfully...");
}

private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
LockSupport.parkNanos(duration.toNanos());
if (rs.next()) {
String isAlive = rs.getString("Alive").trim();
String totalCap = rs.getString("TotalCapacity").trim();
return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap);
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.sql

import org.apache.doris.spark.DorisTestBase
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test

import java.sql.Statement

class DorisReaderITCase extends DorisTestBase {

val DATABASE: String = "test"
val TABLE_READ: String = "tbl_read"
val TABLE_READ_TBL: String = "tbl_read_tbl"

@Test
@throws[Exception]
def testRddSource(): Unit = {
initializeTable(TABLE_READ)

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("rddSource")
val sc = new SparkContext(sparkConf)
import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some(DATABASE + "." + TABLE_READ),
cfg = Some(Map(
"doris.fenodes" -> DorisTestBase.getFenodes,
"doris.request.auth.user" -> DorisTestBase.USERNAME,
"doris.request.auth.password" -> DorisTestBase.PASSWORD
))
)
import scala.collection.JavaConverters._
val result = dorisSparkRDD.collect().toList.asJava
sc.stop()

assert(List(List("doris", 18).asJava, List("spark", 10).asJava).asJava.equals(result))
}

@Test
@throws[Exception]
def testDataFrameSource(): Unit = {
initializeTable(TABLE_READ_TBL)

val session = SparkSession.builder().master("local[*]").getOrCreate()
val dorisSparkDF = session.read
.format("doris")
.option("doris.fenodes", DorisTestBase.getFenodes)
.option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL)
.option("user", DorisTestBase.USERNAME)
.option("password", DorisTestBase.PASSWORD)
.load()

val result = dorisSparkDF.collect().toList.toString()
session.stop()
assert("List([doris,18], [spark,10])".equals(result))
}

@Test
@throws[Exception]
def testSQLSource(): Unit = {
initializeTable(TABLE_READ_TBL)
val session = SparkSession.builder().master("local[*]").getOrCreate()
session.sql(
s"""
|CREATE TEMPORARY VIEW test_source
|USING doris
|OPTIONS(
| "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
| "fenodes"="${DorisTestBase.getFenodes}",
| "user"="${DorisTestBase.USERNAME}",
| "password"="${DorisTestBase.PASSWORD}"
|);
|""".stripMargin)

val result = session.sql(
"""
|select name,age from test_source;
|""".stripMargin).collect().toList.toString()
session.stop()

assert("List([doris,18], [spark,10])".equals(result))
}

@throws[Exception]
private def initializeTable(table: String): Unit = {
try {
val statement: Statement = DorisTestBase.connection.createStatement
try {
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table))
statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
"`name` varchar(256),\n" +
"`age` int\n" +
") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\"\n" +
")\n", DATABASE, table))
statement.execute(String.format("insert into %s.%s values ('doris',18)", DATABASE, table))
statement.execute(String.format("insert into %s.%s values ('spark',10)", DATABASE, table))
} finally {
if (statement != null) statement.close()
}
}
}


}
Loading

0 comments on commit 6f5f5a5

Please sign in to comment.