Skip to content

Commit

Permalink
Add type widening delta sharing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Sep 13, 2024
1 parent 1aaf10b commit ae98b87
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,7 @@ import java.util.{TimeZone, UUID}

import scala.reflect.ClassTag

import org.apache.spark.sql.delta.{
ColumnMappingTableFeature,
DeletionVectorsTableFeature,
DeltaLog,
DeltaParquetFileFormat,
SnapshotDescriptor,
TimestampNTZTableFeature
}
import org.apache.spark.sql.delta.VariantTypeTableFeature
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import com.google.common.hash.Hashing
import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient}
Expand All @@ -51,13 +43,18 @@ object DeltaSharingUtils extends Logging {
DeletionVectorsTableFeature.name,
ColumnMappingTableFeature.name,
TimestampNTZTableFeature.name,
TypeWideningPreviewTableFeature.name,
TypeWideningTableFeature.name,
VariantTypeTableFeature.name
)

val SUPPORTED_READER_FEATURES: Seq[String] =
Seq(
DeletionVectorsTableFeature.name,
ColumnMappingTableFeature.name,
TimestampNTZTableFeature.name,
TypeWideningPreviewTableFeature.name,
TypeWideningTableFeature.name,
VariantTypeTableFeature.name
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.spark

import org.apache.spark.sql.delta.DeltaConfigs
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils
import org.apache.spark.sql.types._

// Unit tests to verify that type widening works with delta sharing.
class DeltaSharingDataSourceTypeWideningSuite
extends QueryTest
with DeltaSQLCommandTest
with DeltaSharingTestSparkUtils
with DeltaSharingDataSourceDeltaTestUtils {

import testImplicits._

protected override def sparkConf: SparkConf = {
super.sparkConf
.set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, true.toString)
}

/** Sets up delta sharing mocks to read a table and validates results. */
private def testReadingDeltaShare(
tableName: String,
versionAsOf: Option[Long],
responseFormat: String,
expectedSchema: StructType,
expectedResult: DataFrame): Unit = {
withTempDir { tempDir =>
val sharedTableName =
if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) {
"type_widening_shared_delta_table"
} else {
"type_widening_shared_parquet_table"
}
prepareMockedClientMetadata(tableName, sharedTableName)
prepareMockedClientGetTableVersion(tableName, sharedTableName, versionAsOf)
if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) {
prepareMockedClientAndFileSystemResult(tableName, sharedTableName, versionAsOf)
} else {
assert(responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET)
prepareMockedClientAndFileSystemResultForParquet(tableName, sharedTableName, versionAsOf)
}

var reader = spark.read
.format("deltaSharing")
.option("responseFormat", responseFormat)
versionAsOf.foreach { version =>
reader = reader.option("versionAsOf", version)
}

withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val profileFile = prepareProfileFile(tempDir)
val result = reader.load(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName")
assert(result.schema === expectedSchema)
checkAnswer(result, expectedResult)
}
}
}

/** Creates a table and applies a type change to it. */
private def withTestTable(testBody: String => Unit): Unit = {
val deltaTableName = "type_widening_table"
withTable(deltaTableName) {
sql(s"CREATE TABLE $deltaTableName (value SMALLINT) USING DELTA")
sql(s"INSERT INTO $deltaTableName VALUES (1), (2)")
sql(s"ALTER TABLE $deltaTableName CHANGE COLUMN value TYPE INT")
sql(s"INSERT INTO $deltaTableName VALUES (3), (${Int.MaxValue})")
sql(s"INSERT INTO $deltaTableName VALUES (4), (5)")
testBody(deltaTableName)
}
}

/** Short-hand for the type widening metadata for column `value` for the test table above. */
private val typeWideningMetadata: Metadata =
new MetadataBuilder()
.putMetadataArray(
"delta.typeChanges", Array(
new MetadataBuilder()
.putLong("tableVersion", 2)
.putString("fromType", "short")
.putString("toType", "integer")
.build()))
.build()

for (responseFormat <- Seq(DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
DeltaSharingOptions.RESPONSE_FORMAT_PARQUET)) {
test(s"Delta sharing with type widening, responseFormat=$responseFormat") {
withTestTable { tableName =>
testReadingDeltaShare(
tableName,
versionAsOf = None,
responseFormat,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2, 3, Int.MaxValue, 4, 5).toDF("value"))
}
}

test(s"Delta sharing with type widening, time travel, responseFormat=$responseFormat") {
withTestTable { tableName =>
testReadingDeltaShare(
tableName,
versionAsOf = Some(3),
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(2),
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(1),
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
expectedSchema = new StructType()
.add("value", ShortType),
expectedResult = Seq(1, 2).toDF("value"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.delta.sharing.spark

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sharing.client.{
DeltaSharingClient,
Expand Down Expand Up @@ -59,16 +60,19 @@ private[spark] class TestClientForDeltaFormatSharing(
tokenRenewalThresholdInSeconds: Int = 600)
extends DeltaSharingClient {

private val supportedReaderFeatures: Seq[String] = Seq(
DeletionVectorsTableFeature,
ColumnMappingTableFeature,
TimestampNTZTableFeature,
TypeWideningPreviewTableFeature,
TypeWideningTableFeature
).map(_.name)

assert(
responseFormat == DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET ||
(
readerFeatures.contains("deletionVectors") &&
readerFeatures.contains("columnMapping") &&
readerFeatures.contains("timestampNtz") &&
readerFeatures.contains("variantType-preview")
),
"deletionVectors, columnMapping, timestampNtz, variantType-preview should be supported in " +
"all types of queries."
supportedReaderFeatures.forall(readerFeatures.split(",").contains),
s"${supportedReaderFeatures.diff(readerFeatures.split(",")).mkString(", ")} " +
s"should be supported in all types of queries."
)

import TestClientForDeltaFormatSharing._
Expand Down

0 comments on commit ae98b87

Please sign in to comment.