diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b585bc1f7..fcfc0fd41 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,32 +13,38 @@ jobs: build: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: - scala: [2.12.11, 2.13.11] - db-version: [3.11.10, 4.0-rc2, 6.8.13] + scala: [2.12.19, 2.13.13] + db-version: [3.11.17, 4.0.12, 4.1.4, 5.0-beta1, dse-6.8.44] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: ccm pip installation uses: BSFishy/pip-action@v1 with: - packages: git+https://github.com/riptano/ccm.git@435f3210e16d0b648fbf33d6390d5ab4c9e630d4 + packages: git+https://github.com/riptano/ccm.git@d74db63d75112908a77b6c80757df9343fdc3338 - - name: Setup Scala - uses: olafurpg/setup-scala@v10 + - name: Setup Java + uses: actions/setup-java@v4 with: - java-version: "adopt@1.8" + distribution: "temurin" + java-version: | # order is important, the last one is the default which will be used by SBT + 11 + 8 - name: sbt tests env: TEST_PARALLEL_TASKS: 1 CCM_CASSANDRA_VERSION: ${{ matrix.db-version }} PUBLISH_VERSION: test + JAVA8_HOME: ${{ env.JAVA_HOME_8_X64 }} + JAVA11_HOME: ${{ env.JAVA_HOME_11_X64 }} run: sbt/sbt ++${{ matrix.scala }} test it:test - name: Publish Test Report - uses: mikepenz/action-junit-report@v3 + uses: mikepenz/action-junit-report@v4 if: always() with: report_paths: '**/target/test-reports/*.xml' diff --git a/README.md b/README.md index 7878a8913..2ec1a3491 100644 --- a/README.md +++ b/README.md @@ -53,26 +53,27 @@ Currently, the following branches are actively supported: 3.0.x ([b3.0](https://github.com/datastax/spark-cassandra-connector/tree/b3.0)) and 2.5.x ([b2.5](https://github.com/datastax/spark-cassandra-connector/tree/b2.5)). -| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions | -|-----------|---------------|-----------------------| --------------------- | -------------------- | ----------------------- | -| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 | -| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 | -| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 | -| 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 | -| 3.1 | 3.1 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 | -| 3.0 | 3.0 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 | -| 2.5 | 2.4 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.11, 2.12 | -| 2.4.2 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11, 2.12 | -| 2.4 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 | -| 2.3 | 2.3 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 | -| 2.0 | 2.0, 2.1, 2.2 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.10, 2.11 | -| 1.6 | 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 | -| 1.5 | 1.5, 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 | -| 1.4 | 1.4 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 | -| 1.3 | 1.3 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 | -| 1.2 | 1.2 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 | -| 1.1 | 1.1, 1.0 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 | -| 1.0 | 1.0, 0.9 | 2.0 | 2.0 | 7 | 2.10, 2.11 | +| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions | +|-----------|---------------|----------------------------|-----------------------|----------------------|--------------------------| +| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18 | 8 | 2.12, 2.13 | +| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 | +| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 | +| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 | +| 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 | +| 3.1 | 3.1 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 | +| 3.0 | 3.0 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 | +| 2.5 | 2.4 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.11, 2.12 | +| 2.4.2 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11, 2.12 | +| 2.4 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 | +| 2.3 | 2.3 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 | +| 2.0 | 2.0, 2.1, 2.2 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.10, 2.11 | +| 1.6 | 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 | +| 1.5 | 1.5, 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 | +| 1.4 | 1.4 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 | +| 1.3 | 1.3 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 | +| 1.2 | 1.2 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 | +| 1.1 | 1.1, 1.0 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 | +| 1.0 | 1.0, 0.9 | 2.0 | 2.0 | 7 | 2.10, 2.11 | **Compatible with 2.1.X where X >= 5* @@ -193,14 +194,13 @@ Note that the integration tests require [CCM](https://github.com/riptano/ccm) to See [Tips for Developing the Spark Cassandra Connector](doc/developers.md) for details. By default, integration tests start up a separate, single Cassandra instance and run Spark in local mode. -It is possible to run integration tests with your own Cassandra and/or Spark cluster. +It is possible to run integration tests with your own Spark cluster. First, prepare a jar with testing code: ./sbt/sbt test:package Then copy the generated test jar to your Spark nodes and run: - export IT_TEST_CASSANDRA_HOST= export IT_TEST_SPARK_MASTER= ./sbt/sbt it:test diff --git a/build.sbt b/build.sbt index 7321278b1..028fcded4 100644 --- a/build.sbt +++ b/build.sbt @@ -3,8 +3,8 @@ import sbt.Keys.parallelExecution import sbt.{Compile, moduleFilter, _} import sbtassembly.AssemblyPlugin.autoImport.assembly -lazy val scala212 = "2.12.11" -lazy val scala213 = "2.13.11" +lazy val scala212 = "2.12.19" +lazy val scala213 = "2.13.13" lazy val supportedScalaVersions = List(scala212, scala213) // factor out common settings diff --git a/connector/src/it/resources/log4j2.properties b/connector/src/it/resources/log4j2.properties new file mode 100644 index 000000000..2e159ab88 --- /dev/null +++ b/connector/src/it/resources/log4j2.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the console +rootLogger.level = warn +rootLogger.appenderRef.stdout.ref = console + +appender.console.type = Console +appender.console.name = console +appender.console.target = SYSTEM_OUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %5p %d{HH:mm:ss,SSS} [T%X{TEST_GROUP_NO}] %C (%F:%L) - %m%n + +logger.ccm.name = com.datastax.spark.connector.ccm +logger.ccm.level = info diff --git a/connector/src/it/resources/logback.xml b/connector/src/it/resources/logback.xml index a54a45b00..2cdda1e8c 100644 --- a/connector/src/it/resources/logback.xml +++ b/connector/src/it/resources/logback.xml @@ -19,4 +19,5 @@ + diff --git a/connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala b/connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala index ccd26118b..da6c42ec4 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala @@ -136,9 +136,15 @@ trait AuthCluster extends SingleClusterFixture { "authentication_options.enabled" -> "true" ))) } else { - Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map( - "authenticator" -> "PasswordAuthenticator" - ))) + if (defaultConfig.getCassandraVersion.compareTo(CcmConfig.V5_0_0) >= 0) { + Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map( + "authenticator.class_name" -> "PasswordAuthenticator" + ))) + } else { + Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map( + "authenticator" -> "PasswordAuthenticator" + ))) + } } } diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedKeySpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedKeySpec.scala index 8f3be756a..8a5542006 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedKeySpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedKeySpec.scala @@ -1,7 +1,7 @@ package com.datastax.spark.connector.cql.sai import com.datastax.spark.connector.SparkCassandraITWordSpecBase -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3 import com.datastax.spark.connector.cluster.DefaultCluster import org.apache.spark.sql.functions.col import org.apache.spark.sql.sources._ @@ -9,7 +9,7 @@ import org.apache.spark.sql.sources._ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiBaseSpec { override def beforeClass { - dseFrom(V6_8_3) { + dseFrom(DSE_V6_8_3) { conn.withSessionDo { session => createKeyspace(session, ks) session.execute( @@ -46,7 +46,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi } "Index on partition key columns" should { - "allow for predicate push down for indexed parts of the partition key" in dseFrom(V6_8_3) { + "allow for predicate push down for indexed parts of the partition key" in dseFrom(DSE_V6_8_3) { assertPushedPredicate( df("pk_test").filter(col("pk_1") === 1), pushedPredicate = EqualTo("pk_1", 1)) @@ -64,13 +64,13 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi pushedPredicate = GreaterThanOrEqual("pk_2", 1)) } - "allow for multiple predicate push down for the same indexed part of the partition key" in dseFrom(V6_8_3) { + "allow for multiple predicate push down for the same indexed part of the partition key" in dseFrom(DSE_V6_8_3) { assertPushedPredicate( df("pk_test").filter(col("pk_3") < 10 and col("pk_3") > 0), pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_3", 0)) } - "allow for multiple range predicate push down for different indexed parts of the partition key" in dseFrom(V6_8_3) { + "allow for multiple range predicate push down for different indexed parts of the partition key" in dseFrom(DSE_V6_8_3) { assertPushedPredicate( df("pk_test").filter(col("pk_3") < 10 and col("pk_1") > 0), pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_1", 0)) @@ -82,7 +82,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi pushedPredicate = EqualTo("pk_3", 10), LessThan("v_1", 1)) } - "allow for range predicate push down for the partition key" in dseFrom(V6_8_3) { + "allow for range predicate push down for the partition key" in dseFrom(DSE_V6_8_3) { assertPushedPredicate( df("pk_test").filter(col("pk_3") < 10 and col("pk_1") > 0 and col("pk_2") >= 0), pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_1", 0), GreaterThanOrEqual("pk_2", 0)) @@ -91,7 +91,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi pushedPredicate = EqualTo("pk_3", 10), LessThan("pk_1", 6), EqualTo("pk_2", 1)) } - "not allow for regular column predicate push down if any part of the partition key has an IN clause" in dseFrom(V6_8_3) { + "not allow for regular column predicate push down if any part of the partition key has an IN clause" in dseFrom(DSE_V6_8_3) { assertNonPushedColumns( df("pk_test").filter("pk_1 = 1 and pk_2 = 2 and pk_3 in(1, 3) and v_1 < 5"), nonPushedColumns = "v_1") @@ -103,18 +103,18 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi nonPushedColumns = "v_1") } - "allow for regular column predicate push down if a part of the clustering key has an IN clause" in dseFrom(V6_8_3) { + "allow for regular column predicate push down if a part of the clustering key has an IN clause" in dseFrom(DSE_V6_8_3) { assertPushedPredicate( df("pk_test").filter("pk_1 = 1 and pk_2 = 2 and pk_3 = 3 and ck_1 in (1,2) and v_1 < 5"), pushedPredicate = EqualTo("pk_1", 1), EqualTo("pk_2", 2), EqualTo("pk_3", 3), In("ck_1", Array(1, 2)), LessThan("v_1", 5)) } - "not allow for push down if more than one equality predicate is defined" in dseFrom(V6_8_3) { + "not allow for push down if more than one equality predicate is defined" in dseFrom(DSE_V6_8_3) { val data = df("pk_test").filter(col("pk_1") === 7 and col("pk_1") === 10) assertPushedPredicate(data, pushedPredicate = EqualTo("pk_1", 7)) } - "allow only for equality push down if equality and range predicates are defined for the same pk column" in dseFrom(V6_8_3) { + "allow only for equality push down if equality and range predicates are defined for the same pk column" in dseFrom(DSE_V6_8_3) { val data = df("pk_test").filter(col("pk_1") === 7 and col("pk_1") < 10) assertPushedPredicate(data, pushedPredicate = EqualTo("pk_1", 7)) data.count() shouldBe 2 @@ -122,13 +122,13 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi } "Index on clustering key columns" should { - "allow for predicate push down for indexed parts of the clustering key" in dseFrom(V6_8_3) { + "allow for predicate push down for indexed parts of the clustering key" in dseFrom(DSE_V6_8_3) { assertPushedPredicate( df("pk_test").filter(col("ck_2") === 1), pushedPredicate = EqualTo("ck_2", 1)) } - "not allow for predicate push down for non-indexed parts of the clustering key" in dseFrom(V6_8_3) { + "not allow for predicate push down for non-indexed parts of the clustering key" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("pk_test").filter(col("ck_3") === 1)) } } diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedListSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedListSpec.scala index 1c0e9e61c..8ba7e5c84 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedListSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedListSpec.scala @@ -1,14 +1,14 @@ package com.datastax.spark.connector.cql.sai import com.datastax.spark.connector.SparkCassandraITWordSpecBase -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3 import com.datastax.spark.connector.cluster.DefaultCluster class IndexedListSpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiCollectionBaseSpec { override def beforeClass { - dseFrom(V6_8_3) { + dseFrom(DSE_V6_8_3) { conn.withSessionDo { session => createKeyspace(session, ks) session.execute( diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedMapSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedMapSpec.scala index f624d66d2..584f5fcab 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedMapSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedMapSpec.scala @@ -1,15 +1,15 @@ package com.datastax.spark.connector.cql.sai import com.datastax.spark.connector.SparkCassandraITWordSpecBase -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3 import com.datastax.spark.connector.cluster.DefaultCluster -import org.apache.spark.sql.functions.{col, _} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources.EqualTo class IndexedMapSpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiBaseSpec { override def beforeClass { - dseFrom(V6_8_3) { + dseFrom(DSE_V6_8_3) { conn.withSessionDo { session => createKeyspace(session) session.execute( @@ -41,66 +41,66 @@ class IndexedMapSpec extends SparkCassandraITWordSpecBase with DefaultCluster wi // TODO: SPARKC-630 "Index on map keys" ignore { - "allow for contains predicate push down on keys" in dseFrom(V6_8_3) { + "allow for contains predicate push down on keys" in dseFrom(DSE_V6_8_3) { // SELECT * from test_storage_attached_index_spec.map_test WHERE map_col_1 CONTAINS KEY 2; assertPushDown(df("map_test").filter(array_contains(map_keys(col("map_col_1")), 2))) } - "not allow for contains predicate push down on values" in dseFrom(V6_8_3) { + "not allow for contains predicate push down on values" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("map_test").filter(array_contains(map_values(col("map_col_1")), 2))) } - "not allow for equality predicate push down" in dseFrom(V6_8_3) { + "not allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("map_test").filter(col("map_col_1").getItem(5) === 4)) } } // TODO: SPARKC-630 "Index on map values" ignore { - "allow for contains predicate push down on values" in dseFrom(V6_8_3) { + "allow for contains predicate push down on values" in dseFrom(DSE_V6_8_3) { // SELECT * from test_storage_attached_index_spec.map_test WHERE map_col_2 CONTAINS 2; assertPushDown(df("map_test").filter(array_contains(map_values(col("map_col_2")), 2))) } - "not allow for contains predicate push down on keys" in dseFrom(V6_8_3) { + "not allow for contains predicate push down on keys" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("map_test").filter(array_contains(map_keys(col("map_col_2")), 2))) } - "not allow for equality predicate push down" in dseFrom(V6_8_3) { + "not allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("map_test").filter(col("map_col_2").getItem(5) === 4)) } } // TODO: SPARKC-630 "Index on map entries" ignore { - "allow for equality predicate push down" in dseFrom(V6_8_3) { + "allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { // SELECT * from test_storage_attached_index_spec.map_test WHERE map_col_3[5] = 4; assertPushDown(df("map_test").filter(col("map_col_3").getItem(5) === 4)) } - "not allow for predicate push down different than equality" in dseFrom(V6_8_3) { + "not allow for predicate push down different than equality" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("map_test").filter(col("map_col_3").getItem(5) > 3)) assertNoPushDown(df("map_test").filter(col("map_col_3").getItem(5) >= 3)) } - "not allow for contains predicate push down on keys" in dseFrom(V6_8_3) { + "not allow for contains predicate push down on keys" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("map_test").filter(array_contains(map_keys(col("map_col_3")), 2))) } - "not allow for contains predicate push down on values" in dseFrom(V6_8_3) { + "not allow for contains predicate push down on values" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("map_test").filter(array_contains(map_values(col("map_col_3")), 2))) } } // TODO: SPARKC-630: Unsupported literal type class scala.collection.immutable.Map$Map1 Map(102 -> 112) "Index on full map" ignore { - "allow for equality predicate push down" in dseFrom(V6_8_3) { + "allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("map_test").filter(col("map_col_4") === Map(102 -> 112)) // spark changes array to wrapped array assertPushedPredicate(data, pushedPredicate = EqualTo("map_col_4", Map(102 -> 112))) } - "allow for only one equality predicate push down when more than one is provided" in dseFrom(V6_8_3) { + "allow for only one equality predicate push down when more than one is provided" in dseFrom(DSE_V6_8_3) { val data = df("map_test").filter(col("map_col_4") === Map(102 -> 112) and col("map_col_4") === Map(107 -> 117)) assertPushedPredicate(data, pushedPredicate = EqualTo("map_col_4", Map(102 -> 112))) } diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedNumericSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedNumericSpec.scala index ca0e73113..4a085f205 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedNumericSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedNumericSpec.scala @@ -4,7 +4,7 @@ import java.sql.{Date, Timestamp} import java.time.{LocalDate, LocalTime} import com.datastax.spark.connector.SparkCassandraITWordSpecBase -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3 import com.datastax.spark.connector.cluster.DefaultCluster import org.apache.spark.sql.functions.col import org.apache.spark.sql.sources._ @@ -15,7 +15,7 @@ class IndexedNumericSpec extends SparkCassandraITWordSpecBase with DefaultCluste private val numericTypes = Seq("int", "date", "inet", "time", "timestamp", "timeuuid") // and others override def beforeClass { - dseFrom(V6_8_3) { + dseFrom(DSE_V6_8_3) { conn.withSessionDo { session => createKeyspace(session) @@ -33,37 +33,37 @@ class IndexedNumericSpec extends SparkCassandraITWordSpecBase with DefaultCluste "Index on a numeric column" should { - "allow for equality predicate push down" in dseFrom(V6_8_3) { + "allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col("int_col") === 7) assertPushedPredicate(data, pushedPredicate = EqualTo("int_col", 7)) data.count() shouldBe 1 } - "allow for gte predicate push down" in dseFrom(V6_8_3) { + "allow for gte predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col("int_col") >= 7) assertPushedPredicate(data, pushedPredicate = GreaterThanOrEqual("int_col", 7)) data.count() shouldBe 3 } - "allow for gt predicate push down" in dseFrom(V6_8_3) { + "allow for gt predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col("int_col") > 7) assertPushedPredicate(data, pushedPredicate = GreaterThan("int_col", 7)) data.count() shouldBe 2 } - "allow for lt predicate push down" in dseFrom(V6_8_3) { + "allow for lt predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col("int_col") < 7) assertPushedPredicate(data, pushedPredicate = LessThan("int_col", 7)) data.count() shouldBe 7 } - "allow for lte predicate push down" in dseFrom(V6_8_3) { + "allow for lte predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col("int_col") <= 7) assertPushedPredicate(data, pushedPredicate = LessThanOrEqual("int_col", 7)) data.count() shouldBe 8 } - "allow for more than one range predicate push down" in dseFrom(V6_8_3) { + "allow for more than one range predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col("int_col") >= 7 and col("int_col") < 9) assertPushedPredicate( data, @@ -71,33 +71,33 @@ class IndexedNumericSpec extends SparkCassandraITWordSpecBase with DefaultCluste data.count() shouldBe 2 } - "allow only for equality push down if range and equality predicates are defined" in dseFrom(V6_8_3) { + "allow only for equality push down if range and equality predicates are defined" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col("int_col") === 2 and col("int_col") < 4) assertPushedPredicate(data, pushedPredicate = EqualTo("int_col", 2)) data.count() shouldBe 1 } - "allow only for a single equality push down if there are more than one" in dseFrom(V6_8_3) { + "allow only for a single equality push down if there are more than one" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col("int_col") === 5 and col("int_col") === 8) assertPushedPredicate(data, pushedPredicate = EqualTo("int_col", 5)) data.count() shouldBe 0 } - "not allow for a predicate push down if OR clause is used" in dseFrom(V6_8_3) { + "not allow for a predicate push down if OR clause is used" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("numeric_types_test").filter(col("int_col") >= 7 or col("int_col") === 4)) } } /** Minimal check to verify if non-obvious numeric types are supported */ private def indexOnNumericColumn(columnName: String, value: Any): Unit = { - "allow for equality predicate push down" in dseFrom(V6_8_3) { + "allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col(columnName) === value) assertPushedPredicate(data, pushedPredicate = EqualTo(columnName, value)) data.count() shouldBe 1 } - "allow for gte predicate push down" in dseFrom(V6_8_3) { + "allow for gte predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("numeric_types_test").filter(col(columnName) >= value) assertPushedPredicate(data, pushedPredicate = GreaterThanOrEqual(columnName, value)) data.count() shouldBe 3 diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedSetSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedSetSpec.scala index b9edd2ae0..a89f38982 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedSetSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedSetSpec.scala @@ -1,14 +1,14 @@ package com.datastax.spark.connector.cql.sai import com.datastax.spark.connector.SparkCassandraITWordSpecBase -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3 import com.datastax.spark.connector.cluster.DefaultCluster class IndexedSetSpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiCollectionBaseSpec { override def beforeClass { - dseFrom(V6_8_3) { + dseFrom(DSE_V6_8_3) { conn.withSessionDo { session => createKeyspace(session, ks) session.execute( diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedStaticSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedStaticSpec.scala index 7e151eb67..678663b2f 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedStaticSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedStaticSpec.scala @@ -1,7 +1,7 @@ package com.datastax.spark.connector.cql.sai import com.datastax.spark.connector.SparkCassandraITWordSpecBase -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3 import com.datastax.spark.connector.cluster.DefaultCluster import org.apache.spark.sql.functions.col import org.apache.spark.sql.sources._ @@ -9,7 +9,7 @@ import org.apache.spark.sql.sources._ class IndexedStaticSpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiBaseSpec { override def beforeClass { - dseFrom(V6_8_3) { + dseFrom(DSE_V6_8_3) { conn.withSessionDo { session => createKeyspace(session, ks) session.execute( @@ -28,11 +28,11 @@ class IndexedStaticSpec extends SparkCassandraITWordSpecBase with DefaultCluster } } "Index on static columns" should { - "allow for predicate push down" in dseFrom(V6_8_3) { + "allow for predicate push down" in dseFrom(DSE_V6_8_3) { assertPushDown(df("static_test").filter(col("static_col_1") === 1)) } - "not cause predicate push down for non-indexed static columns" in dseFrom(V6_8_3) { + "not cause predicate push down for non-indexed static columns" in dseFrom(DSE_V6_8_3) { assertNoPushDown(df("static_test").filter(col("static_col_2") === 1)) } } diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedStringSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedStringSpec.scala index 26106ced0..8cb9335f9 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedStringSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/IndexedStringSpec.scala @@ -1,7 +1,7 @@ package com.datastax.spark.connector.cql.sai import com.datastax.spark.connector.SparkCassandraITWordSpecBase -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3 import com.datastax.spark.connector.cluster.DefaultCluster import org.apache.spark.sql.functions.col import org.apache.spark.sql.sources._ @@ -12,7 +12,7 @@ class IndexedStringSpec extends SparkCassandraITWordSpecBase with DefaultCluster private val stringTypes = Seq("text", "ascii", "varchar", "uuid") override def beforeClass { - dseFrom(V6_8_3) { + dseFrom(DSE_V6_8_3) { conn.withSessionDo { session => createKeyspace(session) @@ -27,25 +27,25 @@ class IndexedStringSpec extends SparkCassandraITWordSpecBase with DefaultCluster } private def indexOnStringColumn(columnName: String): Unit = { - "allow for equality predicate push down" in dseFrom(V6_8_3) { + "allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("text_types_test").filter(col(columnName) === "text3") assertPushedPredicate(data, pushedPredicate = EqualTo(columnName, "text3")) data.count() shouldBe 1 } - "not allow for contains predicate push down" in dseFrom(V6_8_3) { + "not allow for contains predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("text_types_test").filter(col(columnName) contains "text5") assertNoPushDown(data) data.count() shouldBe 1 } - "not allow for range predicate push down" in dseFrom(V6_8_3) { + "not allow for range predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("text_types_test").filter(col(columnName) < "text4") assertNoPushDown(data) data.count() shouldBe 4 } - "allow only for a single equality push down if there are more than one" in dseFrom(V6_8_3) { + "allow only for a single equality push down if there are more than one" in dseFrom(DSE_V6_8_3) { val data = df("text_types_test").filter(col(columnName) === "text1" and col(columnName) === "text2") assertPushedPredicate(data, pushedPredicate = EqualTo(columnName, "text1")) data.count() shouldBe 0 @@ -65,13 +65,13 @@ class IndexedStringSpec extends SparkCassandraITWordSpecBase with DefaultCluster } "Index on a uuid column" should { - "allow for equality predicate push down" in dseFrom(V6_8_3) { + "allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("text_types_test").filter(col("uuid_col") === "123e4567-e89b-12d3-a456-426614174003") assertPushedPredicate(data, pushedPredicate = EqualTo("uuid_col", "123e4567-e89b-12d3-a456-426614174003")) data.count() shouldBe 1 } - "not allow for range predicate push down" in dseFrom(V6_8_3) { + "not allow for range predicate push down" in dseFrom(DSE_V6_8_3) { val data = df("text_types_test").filter(col("uuid_col") < "123e4567-e89b-12d3-a456-426614174004") assertNoPushDown(data) data.count() shouldBe 4 diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiCollectionBaseSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiCollectionBaseSpec.scala index e091df7b6..e68ea222b 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiCollectionBaseSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiCollectionBaseSpec.scala @@ -1,6 +1,6 @@ package com.datastax.spark.connector.cql.sai -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3 import org.apache.spark.sql.functions.{array_contains, col} import org.apache.spark.sql.sources.EqualTo import org.scalatest.WordSpec @@ -12,21 +12,21 @@ trait SaiCollectionBaseSpec extends SaiBaseSpec { this: WordSpec => def indexOnANonFrozenCollection(table: String, column: String): Unit = { - "allow for contains predicate push down" in dseFrom(V6_8_3) { + "allow for contains predicate push down" in dseFrom(DSE_V6_8_3) { val data = df(table).filter(array_contains(col(column), 107)) // TODO: SPARKC-630 assertPushDown(data) data.count shouldBe 1 } - "allow for multiple contains predicate push down" in dseFrom(V6_8_3) { + "allow for multiple contains predicate push down" in dseFrom(DSE_V6_8_3) { val data = df(table).filter(array_contains(col(column), 100) and array_contains(col(column), 110)) // TODO: SPARKC-630 assertPushDown(data) data.count() shouldBe 1 } - "not allow for equal predicate push down" in dseFrom(V6_8_3) { + "not allow for equal predicate push down" in dseFrom(DSE_V6_8_3) { val data = df(table).filter(col(column) === Array(100, 110)) assertNoPushDown(data) data.count() shouldBe 1 @@ -34,27 +34,27 @@ trait SaiCollectionBaseSpec extends SaiBaseSpec { } def indexOnAFrozenCollection(table: String, column: String): Unit = { - "allow for equality predicate push down" in dseFrom(V6_8_3) { + "allow for equality predicate push down" in dseFrom(DSE_V6_8_3) { val data = df(table).filter(col(column) === Array(102, 112)) // spark changes array to wrapped array assertPushedPredicate(data, pushedPredicate = EqualTo(column, mutable.WrappedArray.make(Array(102, 112)))) data.count() shouldBe 1 } - "allow for equality predicate push down for pk column" in dseFrom(V6_8_3) { + "allow for equality predicate push down for pk column" in dseFrom(DSE_V6_8_3) { val data = df(table).filter(col("pk_1") === Array(102, 112)) assertPushedPredicate(data, pushedPredicate = EqualTo("pk_1", mutable.WrappedArray.make(Array(102, 112)))) data.count() shouldBe 1 } - "allow for only one equality predicate push down when more than one is provided" in dseFrom(V6_8_3) { + "allow for only one equality predicate push down when more than one is provided" in dseFrom(DSE_V6_8_3) { val data = df(table).filter(col(column) === Array(102, 112) and col(column) === Array(107, 117)) assertPushedPredicate(data, pushedPredicate = EqualTo(column, mutable.WrappedArray.make(Array(102, 112)))) data.count() shouldBe 0 } - "not allow for contains predicate push down" in dseFrom(V6_8_3) { + "not allow for contains predicate push down" in dseFrom(DSE_V6_8_3) { val data = df(table).filter(array_contains(col(column), 107)) assertNoPushDown(data) data.count() shouldBe 1 diff --git a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala index 7fffee04a..3a9ac7e90 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala @@ -4,20 +4,22 @@ import java.io.IOException import java.time.{Instant, LocalDate, ZoneId, ZonedDateTime} import java.util.Date import com.datastax.oss.driver.api.core.DefaultProtocolVersion._ +import com.datastax.oss.driver.api.core.Version.V4_0_0 import com.datastax.oss.driver.api.core.config.DefaultDriverOption import com.datastax.oss.driver.api.core.cql.SimpleStatement import com.datastax.oss.driver.api.core.cql.SimpleStatement._ import com.datastax.spark.connector._ -import com.datastax.spark.connector.ccm.CcmConfig.{V3_6_0, V4_0_0, V6_7_0} +import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V6_7_0, V3_6_0} import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf} import com.datastax.spark.connector.mapper.{DefaultColumnMapper, JavaBeanColumnMapper, JavaTestBean, JavaTestUDTBean} import com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory import com.datastax.spark.connector.types.{CassandraOption, TypeConverter} import com.datastax.spark.connector.util.RuntimeUtil +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException -import scala.jdk.CollectionConverters._ import scala.concurrent.Future +import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe.typeTag case class KeyValue(key: Int, group: Long, value: String) @@ -1108,14 +1110,13 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster ) } - it should "throw a meaningful exception when reading a table view" in from(cassandra = V4_0_0, dse = V6_7_0) { + it should "throw a meaningful exception when reading a table view" in from(cassandra = V4_0_0, dse = DSE_V6_7_0) { import org.apache.spark.sql.cassandra._ - val ex = intercept[IllegalArgumentException] { + intercept[NoSuchNamespaceException] { val data = spark.read.cassandraFormat("sstable_tasks", "system_views").load() data.show() } - ex.getMessage should contain("Table views are not supported") } it should "throw an exception when trying to write to a Materialized View" in skipIfProtocolVersionLT(V4){ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d6d43598e..e85fc24c7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -31,7 +31,7 @@ object Dependencies .exclude("org.slf4j", "log4j-over-slf4j") def driverCoreExclude(): ModuleID = module - .exclude("com.datastax.oss", "java-driver-core") // doesn't shade guava + .exclude("org.apache.cassandra", "java-driver-core") // doesn't shade guava .exclude("org.apache.tinkerpop", "*") // until SPARK-20075 is fixed we fallback to java workarounds for native calls .exclude("com.github.jnr", "jnr-posix") @@ -42,7 +42,8 @@ object Dependencies val junit = "junit" % "junit" % JUnit val junitInterface = "com.novocode" % "junit-interface" % JUnitInterface val scalaTest = "org.scalatest" %% "scalatest" % ScalaTest - val driverMapperProcessor = "com.datastax.oss" % "java-driver-mapper-processor" % DataStaxJavaDriver + val driverMapperProcessor = "org.apache.cassandra" % "java-driver-mapper-processor" % CassandraJavaDriver + val esriGeometry = "com.esri.geometry" % "esri-geometry-api" % EsriGeometry } object TestConnector { @@ -61,7 +62,8 @@ object Dependencies TestCommon.scalaTest % "test,it", TestCommon.mockito % "test,it", TestCommon.junit % "test,it", - TestCommon.junitInterface % "test,it").map(_.logbackExclude()) + TestCommon.junitInterface % "test,it", + TestCommon.esriGeometry % "test,it").map(_.logbackExclude()) } // Required for metrics @@ -73,8 +75,8 @@ object Dependencies } object Driver { - val driverCore = "com.datastax.oss" % "java-driver-core-shaded" % DataStaxJavaDriver driverCoreExclude() - val driverMapper = "com.datastax.oss" % "java-driver-mapper-runtime" % DataStaxJavaDriver driverCoreExclude() + val driverCore = "org.apache.cassandra" % "java-driver-core-shaded" % CassandraJavaDriver driverCoreExclude() + val driverMapper = "org.apache.cassandra" % "java-driver-mapper-runtime" % CassandraJavaDriver driverCoreExclude() val commonsLang3 = "org.apache.commons" % "commons-lang3" % Versions.CommonsLang3 val paranamer = "com.thoughtworks.paranamer" % "paranamer" % Versions.Paranamer diff --git a/project/Testing.scala b/project/Testing.scala index e47d27ccb..f439aa3fc 100644 --- a/project/Testing.scala +++ b/project/Testing.scala @@ -49,19 +49,19 @@ object Testing { * CCM_IS_DSE env setting takes precedence over this guess. */ private def isDse(version: Option[String]): Boolean = { - version.flatMap(v => Try(v.split('.').head.toInt >= 5).toOption).getOrElse(false) + version.exists(_.startsWith("dse-")) } def getCCMJvmOptions = { val dbVersion = sys.env.get("CCM_CASSANDRA_VERSION") - val ccmCassVersion = dbVersion.map(version => s"-Dccm.version=$version") - val ccmCassVersion2 = dbVersion.map(version => s"-Dcassandra.version=$version") + val ccmVersion = dbVersion.map(version => s"-Dccm.version=${version.stripPrefix("dse-")}") + val cassandraVersion = dbVersion.map(version => s"-Dcassandra.version=${version.stripPrefix("dse-")}") val ccmDse = sys.env.get("CCM_IS_DSE").map(_.toLowerCase == "true").orElse(Some(isDse(dbVersion))) .map(isDSE => s"-Dccm.dse=$isDSE") val cassandraDirectory = sys.env.get("CCM_INSTALL_DIR").map(dir => s"-Dcassandra.directory=$dir") val ccmJava = sys.env.get("CCM_JAVA_HOME").orElse(sys.env.get("JAVA_HOME")).map(dir => s"-Dccm.java.home=$dir") val ccmPath = sys.env.get("CCM_JAVA_HOME").orElse(sys.env.get("JAVA_HOME")).map(dir => s"-Dccm.path=$dir/bin") - val options = Seq(ccmCassVersion, ccmDse, ccmCassVersion2, cassandraDirectory, ccmJava, ccmPath) + val options = Seq(ccmVersion, ccmDse, cassandraVersion, cassandraDirectory, ccmJava, ccmPath) options } diff --git a/project/Versions.scala b/project/Versions.scala index 7ac8abab5..1afbd6ff0 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -5,7 +5,8 @@ object Versions { val CommonsLang3 = "3.10" val Paranamer = "2.8" - val DataStaxJavaDriver = "4.13.0" + val CassandraJavaDriver = "4.18.0" + val EsriGeometry = "2.2.4" val ScalaCheck = "1.14.0" val ScalaTest = "3.0.8" diff --git a/project/build.properties b/project/build.properties index c0bab0494..081fdbbc7 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.2.8 +sbt.version=1.10.0 diff --git a/sbt/sbt b/sbt/sbt index ed4ee4586..75c5cb8ab 100755 --- a/sbt/sbt +++ b/sbt/sbt @@ -20,8 +20,8 @@ # This script launches sbt for this project. If present it uses the system # version of sbt. If there is no system version of sbt it attempts to download # sbt locally. -SBT_VERSION=1.3.3 -URL=https://repo1.maven.org/maven2/org/scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar +SBT_VERSION=1.10.0 +URL=https://repo1.maven.org/maven2/org/scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch-${SBT_VERSION}.jar JAR=sbt/sbt-launch-${SBT_VERSION}.jar # Download sbt launch jar if it hasn't been downloaded yet diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala index 7312c6600..68b812f02 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala @@ -50,11 +50,11 @@ class CcmBridge(config: CcmConfig) extends AutoCloseable { } def dsetool(n: Int, args: String*): Unit = { - execute(s"node$n dsetool ${args.mkString(" ")}") + execute(Seq(s"node$n", "dsetool") ++ args: _*) } def nodetool(n: Int, args: String*): Unit = { - execute(s"node$n nodetool ${args.mkString(" < ")}") + execute(Seq(s"node$n", "nodetool") ++ args: _*) } def refreshSizeEstimates(n: Int): Unit = { @@ -71,6 +71,12 @@ object CcmBridge { private val logger: Logger = LoggerFactory.getLogger(classOf[CcmBridge]) + def execute(cli: Seq[String]): Seq[String] = { + val cmdLine = new CommandLine(cli.head) + cli.tail.foreach(cmdLine.addArgument) + execute(cmdLine) + } + def execute(cli: CommandLine): Seq[String] = { logger.info("Executing: " + cli) diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala index a1a83c102..ef208d29f 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala @@ -2,7 +2,7 @@ package com.datastax.spark.connector.ccm import java.io.{File, IOException, InputStream} import java.net.InetSocketAddress -import java.nio.file.{Files, Path, Paths, StandardCopyOption} +import java.nio.file.{Files, Paths, StandardCopyOption} import com.datastax.oss.driver.api.core.Version import com.datastax.spark.connector.ccm.CcmConfig._ @@ -19,10 +19,11 @@ case class CcmConfig( createOptions: List[String] = List(), dseWorkloads: List[String] = List(), jmxPortOffset: Int = 0, - version: Version = Version.parse(System.getProperty("ccm.version", "3.11.6")), + version: Version = Version.parse(System.getProperty("ccm.version", "4.1.4")), installDirectory: Option[String] = Option(System.getProperty("ccm.directory")), installBranch: Option[String] = Option(System.getProperty("ccm.branch")), dseEnabled: Boolean = Option(System.getProperty("ccm.dse")).exists(_.toLowerCase == "true"), + javaVersion: Option[Int] = None, mode: ClusterMode = ClusterModes.fromEnvVar) { def withSsl(keystorePath: String, keystorePassword: String): CcmConfig = { @@ -56,11 +57,11 @@ case class CcmConfig( version } else { val stableVersion = version.nextStable() - if (stableVersion.compareTo(V6_0_0) >= 0) { - V4_0_0 - } else if (stableVersion.compareTo(V5_1_0) >= 0) { + if (stableVersion.compareTo(DSE_V6_0_0) >= 0) { + Version.V4_0_0 + } else if (stableVersion.compareTo(DSE_V5_1_0) >= 0) { V3_10 - } else if (stableVersion.compareTo(V5_0_0) >= 0) { + } else if (stableVersion.compareTo(DSE_V5_0_0) >= 0) { V3_0_15 } else { V2_1_19 @@ -116,16 +117,16 @@ object CcmConfig { val DEFAULT_SERVER_LOCALHOST_KEYSTORE_PATH: String = "/server_localhost.keystore" // DSE versions - val V6_8_5: Version = Version.parse("6.8.5") - val V6_8_3: Version = Version.parse("6.8.3") - val V6_8_0: Version = Version.parse("6.8.0") - val V6_7_0: Version = Version.parse("6.7.0") - val V6_0_0: Version = Version.parse("6.0.0") - val V5_1_0: Version = Version.parse("5.1.0") - val V5_0_0: Version = Version.parse("5.0.0") + val DSE_V6_8_5: Version = Version.parse("6.8.5") + val DSE_V6_8_3: Version = Version.parse("6.8.3") + val DSE_V6_7_0: Version = Version.parse("6.7.0") + val DSE_V6_0_0: Version = Version.parse("6.0.0") + val DSE_V5_1_0: Version = Version.parse("5.1.0") + val DSE_V5_0_0: Version = Version.parse("5.0.0") // C* versions - val V4_0_0: Version = Version.parse("4.0.0") + val V5_0_0: Version = Version.parse("5.0-alpha1") + val V4_1_0: Version = Version.parse("4.1.0") val V3_6_0: Version = Version.parse("3.6.0") val V3_10: Version = Version.parse("3.10") val V3_0_15: Version = Version.parse("3.0.15") diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala index 993a8fe0b..605abaa33 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala @@ -13,6 +13,13 @@ private[ccm] trait ClusterModeExecutor { protected val dir: Path + protected val javaVersion: Option[Int] = config.javaVersion match { + case None if config.dseEnabled => Some(8) + case None if config.version.getMajor < 5 => Some(8) + case None => Some(11) + case other => other + } + def create(clusterName: String): Unit def start(nodeNo: Int): Unit @@ -20,8 +27,8 @@ private[ccm] trait ClusterModeExecutor { def remove(): Unit def execute(args: String*): Seq[String] = synchronized { - val command = s"ccm ${args.mkString(" ")} --config-dir=${dir.toFile.getAbsolutePath}" - CcmBridge.execute(CommandLine.parse(command)) + val command = "ccm" +: args :+ s"--config-dir=${dir.toFile.getAbsolutePath}" + CcmBridge.execute(command) } def executeUnsanitized(args: String*): Seq[String] = synchronized { @@ -45,20 +52,20 @@ private[ccm] trait ClusterModeExecutor { } def getLastLogLines(path: String, linesCount: Int): Seq[String] = synchronized { - val command = s"tail -$linesCount $path" - CcmBridge.execute(CommandLine.parse(command)) + val command = Seq("tail", s"-$linesCount", path) + CcmBridge.execute(command) } /** * Waits for the node to become alive. The first check is performed after the first interval. */ - def waitForNode(nodeNo: Int, timeout: FiniteDuration, interval: Duration = 5.seconds): Boolean = { + def waitForNode(nodeNo: Int, timeout: FiniteDuration, interval: Duration = 1.seconds): Boolean = { val deadline = timeout.fromNow while (!deadline.isOverdue()) { - Thread.sleep(interval.toMillis) if (isAlive(nodeNo, interval)) { return true } + Thread.sleep(interval.toMillis) } false; } diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala index 829605518..9667afdfb 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala @@ -1,11 +1,11 @@ package com.datastax.spark.connector.ccm.mode -import java.io.{File, FileFilter} +import java.io.File import java.nio.file.{Files, Path, Paths} import java.util.concurrent.atomic.AtomicBoolean import com.datastax.oss.driver.api.core.Version import com.datastax.spark.connector.ccm.CcmConfig -import com.datastax.spark.connector.ccm.CcmConfig.V6_8_5 +import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_5 import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.duration.DurationInt @@ -19,10 +19,12 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { private def waitForNode(nodeNo: Int): Unit = { logger.info(s"Waiting for node $nodeNo to become alive...") + val t0 = System.currentTimeMillis() if (!waitForNode(nodeNo, 2.minutes)) { throw new IllegalStateException(s"Timeouted on waiting for node $nodeNo") } - logger.info(s"Node $nodeNo is alive") + + logger.info(s"Node $nodeNo is alive in ${(System.currentTimeMillis() - t0) / 1000} s") } private def logStdErr(nodeNo: Int): Unit = { @@ -32,16 +34,23 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { if (logsDir.exists() && logsDir.isDirectory) { val stdErrFile = logsDir.listFiles().filter(_.getName.endsWith("stderr.log")).head - logger.error(s"Start command failed, here is the last $linesCount lines of startup-stderr file: \n" + - getLastLogLines(stdErrFile.getAbsolutePath, linesCount).mkString("\n")) + val stdOutFile = logsDir.listFiles().filter(_.getName.endsWith("stdout.log")).head + logger.error(s"logs dir: \n" + logsDir.listFiles().map(_.getName).mkString("\n")) + logger.error(s"Start command failed, here is the last $linesCount lines of startup-stderr and startup-stdout files: \n" + + getLastLogLines(stdErrFile.getAbsolutePath, linesCount).mkString("\n") + getLastLogLines(stdOutFile.getAbsolutePath, linesCount).mkString("\n")) } } } override def start(nodeNo: Int): Unit = { - val formattedJvmArgs = config.jvmArgs.map(arg => s" --jvm_arg=$arg").mkString(" ") + val jvmArgs = nodeNo match { + case 1 => config.jvmArgs :+ "-Dcassandra.superuser_setup_delay_ms=0" :+ "-Dcassandra.ring_delay_ms=1000" :+ "-Dcassandra.skip_sync=true" + case _ => config.jvmArgs :+ "-Dcassandra.ring_delay_ms=5000" :+ "-Dcassandra.skip_sync=true" + } + val formattedJvmArgs = jvmArgs.map(arg => s"--jvm_arg=$arg") + val formattedJvmVersion = javaVersion.map(v => s"--jvm-version=$v").toSeq try { - execute(s"node$nodeNo", "start", formattedJvmArgs + "-v", "--skip-wait-other-notice") + execute(Seq(s"node$nodeNo", "start", "-v", "--skip-wait-other-notice") ++ formattedJvmArgs ++ formattedJvmVersion :_*) waitForNode(nodeNo) } catch { case NonFatal(e) => @@ -68,17 +77,13 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { } /** - * Remove this once C* 4.0.0 is released. - * - * This is a workaround that allows running it:test against 4.0.0-betaX and 4.0.0-rcX. These C* versions are - * published as 4.0-betaX and 4.0-rcX, lack of patch version breaks versioning convention used in integration tests. + * This is a workaround that allows running it:test against X.Y.0-betaZ and X.Y.0-rcZ. These C* versions are + * published as X.Y-betaZ and X.Y-rcZ, lack of patch version breaks versioning convention used in integration tests. */ private def adjustCassandraBetaVersion(version: String): String = { - val beta = "4.0.0-beta(\\d+)".r - val rc = "4.0.0-rc(\\d+)".r + val betaOrRC = "(\\d+).(\\d+).0-(beta|rc)(\\d+)".r version match { - case beta(betaNo) => s"4.0-beta$betaNo" - case rc(rcNo) => s"4.0-rc$rcNo" + case betaOrRC(x, y, t, n) => s"$x.$y-$t$n" case other => other } } @@ -87,12 +92,12 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { if (created.compareAndSet(false, true)) { val options = config.installDirectory .map(dir => config.createOptions :+ s"--install-dir=${new File(dir).getAbsolutePath}") - .orElse(config.installBranch.map(branch => config.createOptions :+ s"-v git:${branch.trim().replaceAll("\"", "")}")) - .getOrElse(config.createOptions :+ s"-v ${adjustCassandraBetaVersion(config.version.toString)}") + .orElse(config.installBranch.map(branch => config.createOptions ++ Seq("-v", s"git:${branch.trim().replaceAll("\"", "")}"))) + .getOrElse(config.createOptions ++ Seq("-v", adjustCassandraBetaVersion(config.version.toString))) val dseFlag = if (config.dseEnabled) Some("--dse") else None - val createArgs = Seq("create", clusterName, "-i", config.ipPrefix, (options ++ dseFlag).mkString(" ")) + val createArgs = Seq("create", clusterName, "-i", config.ipPrefix) ++ options ++ dseFlag // Check installed Directory val repositoryDir = Paths.get( @@ -133,7 +138,7 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { execute(addArgs: _*) - if (config.dseEnabled && config.getDseVersion.exists(_.compareTo(V6_8_5) >= 0)) { + if (config.dseEnabled && config.getDseVersion.exists(_.compareTo(DSE_V6_8_5) >= 0)) { execute(node, "updateconf", s"metadata_directory:${dir.toFile.getAbsolutePath}/metadata$i") } } @@ -141,8 +146,10 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { config.cassandraConfiguration.foreach { case (key, value) => execute("updateconf", s"$key:$value") } - if (config.getCassandraVersion.compareTo(Version.V2_2_0) >= 0) { + if (config.getCassandraVersion.compareTo(Version.V2_2_0) >= 0 && config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) < 0) { execute("updateconf", "enable_user_defined_functions:true") + } else if (config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) >= 0) { + execute("updateconf", "user_defined_functions_enabled:true") } if (config.dseEnabled) { config.dseConfiguration.foreach { case (key, value) => @@ -156,8 +163,10 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { } } else { // C* 4.0.0 has materialized views disabled by default - if (config.getCassandraVersion.compareTo(Version.parse("4.0-beta1")) >= 0) { + if (config.getCassandraVersion.compareTo(Version.V4_0_0) >= 0 && config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) < 0) { execute("updateconf", "enable_materialized_views:true") + } else if (config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) >= 0) { + execute("updateconf", "materialized_views_enabled:true") } } } @@ -166,6 +175,7 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { private[ccm] class StandardModeExecutor(val config: CcmConfig) extends DefaultExecutor { override val dir: Path = Files.createTempDirectory("ccm") + // remove config directory on shutdown dir.toFile.deleteOnExit() // remove db artifacts