From 90a3ca5482700c939bca1baaca1b21f58458cdd0 Mon Sep 17 00:00:00 2001 From: wankunde Date: Fri, 2 Nov 2018 17:41:12 +0800 Subject: [PATCH] Add support for spark configuration spark.sql.caseSensitive --- .../spark/connector/DataFrameFunctions.scala | 12 +++++++++++- .../com/datastax/spark/connector/cql/Schema.scala | 5 +++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/DataFrameFunctions.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/DataFrameFunctions.scala index 76d2fdfca..b5faef039 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/DataFrameFunctions.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/DataFrameFunctions.scala @@ -3,6 +3,8 @@ package com.datastax.spark.connector import com.datastax.spark.connector.cql._ import org.apache.spark.SparkContext import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{StructField, StructType} /** Provides Cassandra-specific methods on [[org.apache.spark.sql.DataFrame]] */ class DataFrameFunctions(dataFrame: DataFrame) extends Serializable { @@ -26,7 +28,15 @@ class DataFrameFunctions(dataFrame: DataFrame) extends Serializable { val protocolVersion = connector. withClusterDo(_.getConfiguration.getProtocolOptions.getProtocolVersion) - val rawTable = TableDef.fromDataFrame(dataFrame, keyspaceName, tableName, protocolVersion) + val caseSensitive = sparkContext.getConf.getBoolean(SQLConf.CASE_SENSITIVE.key, false) + val schema = if(!caseSensitive) { + new StructType( + dataFrame.schema.fields.map( + field => StructField(field.name.toLowerCase, field.dataType, field.nullable))) + } else + dataFrame.schema + + val rawTable = TableDef.fromDataFrame(schema, keyspaceName, tableName, protocolVersion) val columnMapping = rawTable.columnByName val columnNames = columnMapping.keys.toSet diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 6135421e0..3d25ac80e 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -14,6 +14,7 @@ import com.datastax.spark.connector.types.{ColumnType, CounterType} import com.datastax.spark.connector.util.NameTools import com.datastax.spark.connector.util.Quote._ import com.datastax.spark.connector.util.Logging +import org.apache.spark.sql.types.StructType /** Abstract column / field definition. * Common to tables and user-defined types */ @@ -217,12 +218,12 @@ object TableDef { implicitly[ColumnMapper[T]].newTable(keyspaceName, tableName, protocolVersion) def fromDataFrame( - dataFrame: DataFrame, + schema: StructType, keyspaceName: String, tableName: String, protocolVersion: ProtocolVersion): TableDef = - new DataFrameColumnMapper(dataFrame.schema).newTable(keyspaceName, tableName, protocolVersion) + new DataFrameColumnMapper(schema).newTable(keyspaceName, tableName, protocolVersion) } /** A Cassandra keyspace metadata that can be serialized. */