Skip to content

Commit

Permalink
SPARKC-706: Add basic support for Cassandra vectors (#1366)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacek-lewandowski authored Jun 21, 2024
1 parent 6c6ce1b commit 965b2dc
Show file tree
Hide file tree
Showing 21 changed files with 445 additions and 28 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
3.5.1
* Support for Vector type available in Cassandra 5.0 (SPARKC-706)
* Upgrade Cassandra Java Driver to 4.18.1, support Cassandra 5.0 in test framework (SPARKC-710)

3.5.0
* Support for Apache Spark 3.5 (SPARKC-704)
Expand Down
21 changes: 14 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

## Quick Links

| What | Where |
| ---------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Community | Chat with us at [Apache Cassandra](https://cassandra.apache.org/_/community.html#discussions) |
| Scala Docs | Most Recent Release (3.5.0): [Connector API docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/connector/com/datastax/spark/connector/index.html), [Connector Driver docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/driver/com/datastax/spark/connector/index.html) |
| Latest Production Release | [3.5.0](https://search.maven.org/artifact/com.datastax.spark/spark-cassandra-connector_2.12/3.5.0/jar) |
| What | Where |
| ---------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Community | Chat with us at [Apache Cassandra](https://cassandra.apache.org/_/community.html#discussions) |
| Scala Docs | Most Recent Release (3.5.1): [Connector API docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/connector/com/datastax/spark/connector/index.html), [Connector Driver docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/driver/com/datastax/spark/connector/index.html) |
| Latest Production Release | [3.5.1](https://search.maven.org/artifact/com.datastax.spark/spark-cassandra-connector_2.12/3.5.1/jar) |

## News
### 3.5.1
- The latest release of the Spark-Cassandra-Connector introduces support for vector types, greatly enhancing its capabilities. This new feature allows developers to seamlessly integrate and work with Cassandra 5.0 and Astra vectors within the Spark ecosystem. By supporting vector types, the connector now provides insights into AI and Retrieval-Augmented Generation (RAG) data, enabling more advanced and efficient data processing and analysis.

## Features

Expand Down Expand Up @@ -55,7 +59,7 @@ Currently, the following branches are actively supported:

| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
|-----------|---------------|----------------------------|-----------------------|----------------------|--------------------------|
| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18 | 8 | 2.12, 2.13 |
| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18.1 | 8 | 2.12, 2.13 |
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
Expand All @@ -80,6 +84,9 @@ Currently, the following branches are actively supported:
## Hosted API Docs
API documentation for the Scala and Java interfaces are available online:

### 3.5.1
* [Spark-Cassandra-Connector](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/connector/com/datastax/spark/connector/index.html)

### 3.5.0
* [Spark-Cassandra-Connector](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/connector/com/datastax/spark/connector/index.html)

Expand Down Expand Up @@ -111,7 +118,7 @@ This project is available on the Maven Central Repository.
For SBT to download the connector binaries, sources and javadoc, put this in your project
SBT config:

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.1"

* The default Scala version for Spark 3.0+ is 2.12 please choose the appropriate build. See the
[FAQ](doc/FAQ.md) for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ trait SparkCassandraITSpecBase
}

override def withFixture(test: NoArgTest): Outcome = wrapUnserializableExceptions {
super.withFixture(test)
super.withFixture(test)
}

def getKsName = {
Expand Down Expand Up @@ -145,18 +145,32 @@ trait SparkCassandraITSpecBase
else report(s"Skipped Because ProtocolVersion $pv < $protocolVersion")
}

/** Skips the given test if the Cluster Version is lower or equal to the given `cassandra` Version or `dse` Version
* (if this is a DSE cluster) */
def from(cassandra: Version, dse: Version)(f: => Unit): Unit = {
/** Runs the given test only if the cluster type and version matches.
*
* @param cassandra run the test if the cluster is Cassandra >= the given version;
* if `None`, never run the test for Cassandra clusters
* @param dse run the test if the cluster is DSE >= the given version;
* if `None`, never run the test for DSE clusters
* @param f the test to run
*/
def from(cassandra: Version, dse: Version)(f: => Unit): Unit = from(Option(cassandra), Option(dse))(f)

def from(cassandra: Option[Version] = None, dse: Option[Version] = None)(f: => Unit): Unit = {
if (isDse(conn)) {
from(dse)(f)
dse match {
case Some(dseVersion) => from(dseVersion)(f)
case None => report(s"Skipped because not DSE")
}
} else {
from(cassandra)(f)
cassandra match {
case Some(cassandraVersion) => from(cassandraVersion)(f)
case None => report(s"Skipped because not Cassandra")
}
}
}

/** Skips the given test if the Cluster Version is lower or equal to the given version */
def from(version: Version)(f: => Unit): Unit = {
/** Skips the given test if the Cluster Version is lower than the given version */
private def from(version: Version)(f: => Unit): Unit = {
skip(cluster.getCassandraVersion, version) { f }
}

Expand All @@ -172,7 +186,7 @@ trait SparkCassandraITSpecBase
else f
}

/** Skips the given test if the Cluster Version is lower or equal to the given version or the cluster is not DSE */
/** Skips the given test if the Cluster Version is lower than the given version or the cluster is not DSE */
def dseFrom(version: Version)(f: => Any): Unit = {
dseOnly {
skip(cluster.getDseVersion.get, version) { f }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datastax.spark.connector.cql

import com.datastax.spark.connector.SparkCassandraITWordSpecBase
import com.datastax.spark.connector.ccm.CcmConfig
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.types._
import com.datastax.spark.connector.util.schemaFromCassandra
Expand Down Expand Up @@ -49,6 +50,9 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
s"""CREATE INDEX test_d9_m23423ap_idx ON $ks.test (full(d10_set))""")
session.execute(
s"""CREATE INDEX test_d7_int_idx ON $ks.test (d7_int)""")
from(Some(CcmConfig.V5_0_0), None) {
session.execute(s"ALTER TABLE $ks.test ADD d17_vector frozen<vector<int,3>>")
}

for (i <- 0 to 9) {
session.execute(s"insert into $ks.test (k1,k2,k3,c1,c2,c3,d10_set) " +
Expand Down Expand Up @@ -111,8 +115,8 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {

"allow to read regular column definitions" in {
val columns = table.regularColumns
columns.size shouldBe 16
columns.map(_.columnName).toSet shouldBe Set(
columns.size should be >= 16
columns.map(_.columnName).toSet should contain allElementsOf Set(
"d1_blob", "d2_boolean", "d3_decimal", "d4_double", "d5_float",
"d6_inet", "d7_int", "d8_list", "d9_map", "d10_set",
"d11_timestamp", "d12_uuid", "d13_timeuuid", "d14_varchar",
Expand All @@ -136,6 +140,9 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
table.columnByName("d14_varchar").columnType shouldBe VarCharType
table.columnByName("d15_varint").columnType shouldBe VarIntType
table.columnByName("d16_address").columnType shouldBe a [UserDefinedType]
from(Some(CcmConfig.V5_0_0), None) {
table.columnByName("d17_vector").columnType shouldBe VectorType(IntType, 3)
}
}

"allow to list fields of a user defined type" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption
import com.datastax.oss.driver.api.core.cql.SimpleStatement
import com.datastax.oss.driver.api.core.cql.SimpleStatement._
import com.datastax.spark.connector._
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V6_7_0, V3_6_0}
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V5_1_0, DSE_V6_7_0, V3_6_0}
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf}
import com.datastax.spark.connector.mapper.{DefaultColumnMapper, JavaBeanColumnMapper, JavaTestBean, JavaTestUDTBean}
Expand Down Expand Up @@ -794,7 +794,7 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster
results should contain ((KeyGroup(3, 300), (3, 300, "0003")))
}

it should "allow the use of PER PARTITION LIMITs " in from(V3_6_0) {
it should "allow the use of PER PARTITION LIMITs " in from(cassandra = V3_6_0, dse = DSE_V5_1_0) {
val result = sc.cassandraTable(ks, "clustering_time").perPartitionLimit(1).collect
result.length should be (1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption._
import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, BoundStatement}
import com.datastax.oss.driver.api.core.{DefaultConsistencyLevel, DefaultProtocolVersion}
import com.datastax.spark.connector._
import com.datastax.spark.connector.ccm.CcmConfig.V3_6_0
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V5_1_0, V3_6_0}
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.embedded.SparkTemplate._
Expand Down Expand Up @@ -425,7 +425,7 @@ class RDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster {

}

it should "should be joinable with a PER PARTITION LIMIT limit" in from(V3_6_0){
it should "should be joinable with a PER PARTITION LIMIT limit" in from(cassandra = V3_6_0, dse = DSE_V5_1_0){
val source = sc.parallelize(keys).map(x => (x, x * 100))
val someCass = source
.joinWithCassandraTable(ks, wideTable, joinColumns = SomeColumns("key", "group"))
Expand Down
Loading

0 comments on commit 965b2dc

Please sign in to comment.