diff --git a/.github/workflows/run-itcase-12.yml b/.github/workflows/run-itcase-12.yml
new file mode 100644
index 00000000..87a9ba80
--- /dev/null
+++ b/.github/workflows/run-itcase-12.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run ITCases 1.2
+on:
+ pull_request:
+ push:
+
+jobs:
+ build-extension:
+ name: "Run ITCases 1.2"
+ runs-on: ubuntu-latest
+ defaults:
+ run:
+ shell: bash
+ steps:
+ - name: Checkout
+ uses: actions/checkout@master
+
+ - name: Setup java
+ uses: actions/setup-java@v2
+ with:
+ distribution: adopt
+ java-version: '8'
+
+ - name: Run ITCases
+ run: |
+ cd spark-doris-connector && mvn test -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86"
+
diff --git a/.github/workflows/run-itcase-20.yml b/.github/workflows/run-itcase-20.yml
new file mode 100644
index 00000000..f972c367
--- /dev/null
+++ b/.github/workflows/run-itcase-20.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run ITCases 2.0
+on:
+ pull_request:
+ push:
+
+jobs:
+ build-extension:
+ name: "Run ITCases 2.0"
+ runs-on: ubuntu-latest
+ defaults:
+ run:
+ shell: bash
+ steps:
+ - name: Checkout
+ uses: actions/checkout@master
+
+ - name: Setup java
+ uses: actions/setup-java@v2
+ with:
+ distribution: adopt
+ java-version: '8'
+
+ - name: Run ITCases
+ run: |
+ cd spark-doris-connector && mvn test -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3"
+
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 89e1716f..1dd5ade4 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -78,6 +78,7 @@
4.1.77.Final
2.10.5
1.0.0
+ 1.17.6
@@ -199,6 +200,18 @@
test
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+ org.awaitility
+ awaitility
+ 4.2.0
+ test
+
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/DorisTestBase.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/DorisTestBase.java
new file mode 100644
index 00000000..9b3989a1
--- /dev/null
+++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/DorisTestBase.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark;
+
+import com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+import static org.awaitility.Durations.ONE_SECOND;
+
+public abstract class DorisTestBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
+ protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image");
+ private static final String DRIVER_JAR =
+ "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+ protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ protected static final String URL = "jdbc:mysql://%s:9030";
+ protected static final String USERNAME = "root";
+ public static final String PASSWORD = "";
+ protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
+ protected static Connection connection;
+
+ protected static String getFenodes() {
+ return DORIS_CONTAINER.getHost() + ":8030";
+ }
+
+ @BeforeClass
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
+ given().ignoreExceptions()
+ .await()
+ .atMost(300, TimeUnit.SECONDS)
+ .pollInterval(ONE_SECOND)
+ .untilAsserted(DorisTestBase::initializeJdbcConnection);
+ LOG.info("Containers are started.");
+ }
+
+ @AfterClass
+ public static void stopContainers() {
+ LOG.info("Stopping containers...");
+ DORIS_CONTAINER.stop();
+ LOG.info("Containers are stopped.");
+ }
+
+ public static GenericContainer createDorisContainer() {
+ GenericContainer container =
+ new GenericContainer<>(DORIS_DOCKER_IMAGE)
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases("DorisContainer")
+ .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
+ .withEnv("FE_ID", "1")
+ .withEnv("CURRENT_BE_IP", "127.0.0.1")
+ .withEnv("CURRENT_BE_PORT", "9050")
+ .withCommand("ulimit -n 65536")
+ .withCreateContainerCmdModifier(
+ cmd -> cmd.getHostConfig().withMemorySwap(0L))
+ .withPrivilegedMode(true)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));
+
+ container.setPortBindings(
+ Lists.newArrayList(
+ String.format("%s:%s", "8030", "8030"),
+ String.format("%s:%s", "9030", "9030"),
+ String.format("%s:%s", "9060", "9060"),
+ String.format("%s:%s", "8040", "8040")));
+
+ return container;
+ }
+
+ protected static void initializeJdbcConnection() throws SQLException, MalformedURLException {
+ URLClassLoader urlClassLoader =
+ new URLClassLoader(
+ new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
+ LOG.info("Try to connect to Doris...");
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
+ try (Statement statement = connection.createStatement()) {
+ ResultSet resultSet;
+ do {
+ LOG.info("Wait for the Backend to start successfully...");
+ resultSet = statement.executeQuery("show backends");
+ } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+ }
+ LOG.info("Connected to Doris successfully...");
+ }
+
+ private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
+ LockSupport.parkNanos(duration.toNanos());
+ if (rs.next()) {
+ String isAlive = rs.getString("Alive").trim();
+ String totalCap = rs.getString("TotalCapacity").trim();
+ return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap);
+ }
+ return false;
+ }
+}
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisReaderITCase.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisReaderITCase.scala
new file mode 100644
index 00000000..a4a288ae
--- /dev/null
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.DorisTestBase
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Test
+
+import java.sql.Statement
+
+class DorisReaderITCase extends DorisTestBase {
+
+ val DATABASE: String = "test"
+ val TABLE_READ: String = "tbl_read"
+ val TABLE_READ_TBL: String = "tbl_read_tbl"
+
+ @Test
+ @throws[Exception]
+ def testRddSource(): Unit = {
+ initializeTable(TABLE_READ)
+
+ val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("rddSource")
+ val sc = new SparkContext(sparkConf)
+ import org.apache.doris.spark._
+ val dorisSparkRDD = sc.dorisRDD(
+ tableIdentifier = Some(DATABASE + "." + TABLE_READ),
+ cfg = Some(Map(
+ "doris.fenodes" -> DorisTestBase.getFenodes,
+ "doris.request.auth.user" -> DorisTestBase.USERNAME,
+ "doris.request.auth.password" -> DorisTestBase.PASSWORD
+ ))
+ )
+ import scala.collection.JavaConverters._
+ val result = dorisSparkRDD.collect().toList.asJava
+ sc.stop()
+
+ assert(List(List("doris", 18).asJava, List("spark", 10).asJava).asJava.equals(result))
+ }
+
+ @Test
+ @throws[Exception]
+ def testDataFrameSource(): Unit = {
+ initializeTable(TABLE_READ_TBL)
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ val dorisSparkDF = session.read
+ .format("doris")
+ .option("doris.fenodes", DorisTestBase.getFenodes)
+ .option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL)
+ .option("user", DorisTestBase.USERNAME)
+ .option("password", DorisTestBase.PASSWORD)
+ .load()
+
+ val result = dorisSparkDF.collect().toList.toString()
+ session.stop()
+ assert("List([doris,18], [spark,10])".equals(result))
+ }
+
+ @Test
+ @throws[Exception]
+ def testSQLSource(): Unit = {
+ initializeTable(TABLE_READ_TBL)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
+ | "fenodes"="${DorisTestBase.getFenodes}",
+ | "user"="${DorisTestBase.USERNAME}",
+ | "password"="${DorisTestBase.PASSWORD}"
+ |);
+ |""".stripMargin)
+
+ val result = session.sql(
+ """
+ |select name,age from test_source;
+ |""".stripMargin).collect().toList.toString()
+ session.stop()
+
+ assert("List([doris,18], [spark,10])".equals(result))
+ }
+
+ @throws[Exception]
+ private def initializeTable(table: String): Unit = {
+ try {
+ val statement: Statement = DorisTestBase.connection.createStatement
+ try {
+ statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table))
+ statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
+ "`name` varchar(256),\n" +
+ "`age` int\n" +
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\"\n" +
+ ")\n", DATABASE, table))
+ statement.execute(String.format("insert into %s.%s values ('doris',18)", DATABASE, table))
+ statement.execute(String.format("insert into %s.%s values ('spark',10)", DATABASE, table))
+ } finally {
+ if (statement != null) statement.close()
+ }
+ }
+ }
+
+
+}
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisWriterITCase.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisWriterITCase.scala
new file mode 100644
index 00000000..747c0bdc
--- /dev/null
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.DorisTestBase
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+
+import java.sql.{ResultSet, Statement}
+import scala.collection.mutable.ListBuffer
+
+class DorisWriterITCase extends DorisTestBase {
+
+ val DATABASE: String = "test"
+ val TABLE_CSV: String = "tbl_csv"
+ val TABLE_JSON: String = "tbl_json"
+ val TABLE_JSON_TBL: String = "tbl_json_tbl"
+
+ @Test
+ @throws[Exception]
+ def testSinkCsvFormat(): Unit = {
+ initializeTable(TABLE_CSV)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris_csv", 1),
+ ("spark_csv", 2)
+ )).toDF("name", "age")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", DorisTestBase.getFenodes)
+ .option("doris.table.identifier", DATABASE + "." + TABLE_CSV)
+ .option("user", DorisTestBase.USERNAME)
+ .option("password", DorisTestBase.PASSWORD)
+ .option("sink.properteis.column_separator", ",")
+ .option("sink.properteis.line_delimiter", "\n")
+ .option("sink.properteis.format", "csv")
+ .save()
+ session.stop()
+
+ Thread.sleep(10000)
+ val actual = queryResult(TABLE_CSV);
+ val expected = ListBuffer(List("doris_csv", 1), List("spark_csv", 2))
+ assert(expected.equals(actual))
+ }
+
+ @Test
+ @throws[Exception]
+ def testSinkJsonFormat(): Unit = {
+ initializeTable(TABLE_JSON)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris_json", 1),
+ ("spark_json", 2)
+ )).toDF("name", "age")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", DorisTestBase.getFenodes)
+ .option("doris.table.identifier", DATABASE + "." + TABLE_JSON)
+ .option("user", DorisTestBase.USERNAME)
+ .option("password", DorisTestBase.PASSWORD)
+ .option("sink.properteis.read_json_by_line", "true")
+ .option("sink.properteis.format", "json")
+ .save()
+ session.stop()
+
+ Thread.sleep(10000)
+ val actual = queryResult(TABLE_JSON);
+ val expected = ListBuffer(List("doris_json", 1), List("spark_json", 2))
+ assert(expected.equals(actual))
+ }
+
+ @Test
+ @throws[Exception]
+ def testSQLSinkFormat(): Unit = {
+ initializeTable(TABLE_JSON_TBL)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris_tbl", 1),
+ ("spark_tbl", 2)
+ )).toDF("name", "age")
+ df.createTempView("mock_source")
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_sink
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}",
+ | "fenodes"="${DorisTestBase.getFenodes}",
+ | "user"="${DorisTestBase.USERNAME}",
+ | "password"="${DorisTestBase.PASSWORD}"
+ |);
+ |""".stripMargin)
+ session.sql(
+ """
+ |insert into test_sink select name,age from mock_source ;
+ |""".stripMargin)
+ session.stop()
+
+ Thread.sleep(10000)
+ val actual = queryResult(TABLE_JSON_TBL);
+ val expected = ListBuffer(List("doris_tbl", 1), List("spark_tbl", 2))
+ assert(expected.equals(actual))
+ }
+
+ private def queryResult(table: String): ListBuffer[Any] = {
+ val actual = new ListBuffer[Any]
+ try {
+ val sinkStatement: Statement = DorisTestBase.connection.createStatement
+ try {
+ val sinkResultSet: ResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, table))
+ while (sinkResultSet.next) {
+ val row = List(sinkResultSet.getString("name"), sinkResultSet.getInt("age"))
+ actual += row
+ }
+ } finally if (sinkStatement != null) sinkStatement.close()
+ }
+ actual
+ }
+
+ @throws[Exception]
+ private def initializeTable(table: String): Unit = {
+ try {
+ val statement: Statement = DorisTestBase.connection.createStatement
+ try {
+ statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table))
+ statement.execute(String.format(
+ "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" + ") " +
+ "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table))
+ } finally if (statement != null) statement.close()
+ }
+ }
+
+}