Skip to content

Commit

Permalink
Merge pull request #766 from datastax/SPARKC-218
Browse files Browse the repository at this point in the history
SPARKC-218 Fixed UDT collection column bug
  • Loading branch information
pkolaczk committed Jul 22, 2015
2 parents d4f2861 + 7103834 commit 1c02771
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,64 @@ class CassandraSQLSpec extends SparkCassandraITFlatSpecBase {
row.getString(1) should be ("name")
}

it should "allow to select UDT collection column and nested UDT column" in {
conn.withSessionDo { session =>
session.execute(
s"""
|CREATE TYPE IF NOT EXISTS sql_test.category_metadata (
| category_id text,
| metric_descriptors set <text>
|)
""".stripMargin.replaceAll("\n", " "))
session.execute(
s"""
|CREATE TYPE IF NOT EXISTS sql_test.object_metadata (
| name text,
| category_metadata frozen<category_metadata>,
| bucket_size int
|)
""".stripMargin.replaceAll("\n", " "))
session.execute(
s"""
|CREATE TYPE IF NOT EXISTS sql_test.relation (
| type text,
| object_type text,
| related_to text,
| obj_id text
|)
""".stripMargin.replaceAll("\n", " "))
session.execute(
s"""
|CREATE TABLE IF NOT EXISTS sql_test.objects (
| obj_id text,
| metadata frozen<object_metadata>,
| relations set<frozen<relation>>,
| ts timestamp, PRIMARY KEY(obj_id)
|)
""".stripMargin.replaceAll("\n", " "))
session.execute(
s"""
|INSERT INTO sql_test.objects (obj_id, ts, metadata)
|values (
| '123', '2015-06-16 15:53:23-0400',
| {
| name: 'foo',
| category_metadata: {
| category_id: 'thermostat',
| metric_descriptors: {}
| },
| bucket_size: 0
| }
|)
""".stripMargin.replaceAll("\n", " "))
}
val cc = new CassandraSQLContext(sc)
cc.setKeyspace("sql_test")
val result = cc.load("org.apache.spark.sql.cassandra",
options = Map( "c_table" -> "objects", "keyspace" -> "sql_test")).collect()
result should have length 1
}

// Regression test for #454: java.util.NoSuchElementException thrown when accessing timestamp field using CassandraSQLContext
it should "allow to restrict a clustering timestamp column value" in {
conn.withSessionDo { session =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ object DataTypeConverter extends Logging {
StructField(field.columnName, catalystDataType(field.columnType, nullable = true), nullable = true)

cassandraType match {
case connector.types.SetType(et) => catalystTypes.ArrayType(primitiveTypeMap(et), nullable)
case connector.types.ListType(et) => catalystTypes.ArrayType(primitiveTypeMap(et), nullable)
case connector.types.MapType(kt, vt) => catalystTypes.MapType(primitiveTypeMap(kt), primitiveTypeMap(vt), nullable)
case connector.types.SetType(et) => catalystTypes.ArrayType(catalystDataType(et, nullable), nullable)
case connector.types.ListType(et) => catalystTypes.ArrayType(catalystDataType(et, nullable), nullable)
case connector.types.MapType(kt, vt) => catalystTypes.MapType(catalystDataType(kt, nullable), catalystDataType(vt, nullable), nullable)
case connector.types.UserDefinedType(_, fields) => catalystTypes.StructType(fields.map(catalystStructField))
case connector.types.VarIntType =>
logWarning("VarIntType is mapped to catalystTypes.DecimalType with unlimited values.")
Expand Down

0 comments on commit 1c02771

Please sign in to comment.