Skip to content

Commit

Permalink
Merge remote-tracking branch 'delta/master' into fix-strip-view-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Sep 25, 2024
2 parents 7c272c2 + 37cc821 commit ea1f2a1
Show file tree
Hide file tree
Showing 129 changed files with 2,906 additions and 1,293 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/spark_python_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
# `-SNAPSHOT` in version (e.g. `3.3.0-SNAPSHOT`) as the version is picked up from
# the`version.sbt` file.
pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0
pipenv run pip install pyspark==3.5.0
pipenv run pip install pyspark==3.5.3
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
pipenv run pip install black==23.9.1
pipenv run pip install importlib_metadata==3.10.0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/spark_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
# `-SNAPSHOT` in version (e.g. `3.3.0-SNAPSHOT`) as the version is picked up from
# the`version.sbt` file.
pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0
pipenv run pip install pyspark==3.5.2
pipenv run pip install pyspark==3.5.3
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
pipenv run pip install black==23.9.1
pipenv run pip install importlib_metadata==3.10.0
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ RUN pip3 install --upgrade pip
# the`version.sbt` file.
RUN pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0

RUN pip3 install pyspark==3.5.2
RUN pip3 install pyspark==3.5.3

RUN pip3 install mypy==0.982

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ scalaVersion := "2.12.18"
lazy val root = (project in file("."))
.settings(
name := "benchmarks",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.2" % "provided",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.3" % "provided",
libraryDependencies += "com.github.scopt" %% "scopt" % "4.0.1",
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.1",

Expand Down
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ val all_scala_versions = Seq(scala212, scala213)
val default_scala_version = settingKey[String]("Default Scala version")
Global / default_scala_version := scala212

val LATEST_RELEASED_SPARK_VERSION = "3.5.2"
val LATEST_RELEASED_SPARK_VERSION = "3.5.3"
val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT"
val sparkVersion = settingKey[String]("Spark version")
spark / sparkVersion := getSparkVersion()
Expand Down Expand Up @@ -176,6 +176,7 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5",
Test / unmanagedSourceDirectories += (Test / baseDirectory).value / "src" / "test" / "scala-spark-3.5",
Antlr4 / antlr4Version := "4.9.3",
Test / javaOptions ++= Seq("-Dlog4j.configurationFile=log4j2.properties"),

// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
Expand Down Expand Up @@ -204,8 +205,9 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
)
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
"-Dlog4j.configurationFile=log4j2_spark_master.properties"
),

// Java-/Scala-/Uni-Doc Settings
// This isn't working yet against Spark Master.
Expand Down
16 changes: 0 additions & 16 deletions connectors/.github/workflows/new_pull_request.yaml

This file was deleted.

19 changes: 0 additions & 19 deletions connectors/.github/workflows/new_updated_issue.yaml

This file was deleted.

43 changes: 0 additions & 43 deletions connectors/.github/workflows/test.yaml

This file was deleted.

20 changes: 0 additions & 20 deletions connectors/.github/workflows/updated_pull_request.yaml

This file was deleted.

38 changes: 0 additions & 38 deletions connectors/dev/README.md

This file was deleted.

2 changes: 1 addition & 1 deletion docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies:
- packaging==23.2
- py4j==0.10.9.7
- pygments==2.16.1
- pyspark==3.5.2
- pyspark==3.5.3
- pytz==2023.3.post1
- requests==2.31.0
- six==1.16.0
Expand Down
2 changes: 1 addition & 1 deletion examples/scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ val lookupSparkVersion: PartialFunction[(Int, Int), String] = {
// version 4.0.0-preview1
case (major, minor) if major >= 4 => "4.0.0-preview1"
// versions 3.3.x+
case (major, minor) if major >= 3 && minor >=3 => "3.5.2"
case (major, minor) if major >= 3 && minor >=3 => "3.5.3"
// versions 3.0.0 to 3.2.x
case (major, minor) if major >= 3 && minor <=2 => "3.5.0"
// versions 2.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class IcebergConversionTransaction(
}

def getExpireSnapshotHelper(): ExpireSnapshotHelper = {
val ret = new ExpireSnapshotHelper(txn.expireSnapshots().cleanExpiredFiles(false))
val ret = new ExpireSnapshotHelper(txn.expireSnapshots())
fileUpdates += ret
ret
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkContext
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogStorageFormat}
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.icebergShaded.IcebergTransactionUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType, StructField}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -111,7 +109,6 @@ class ConvertToIcebergSuite extends QueryTest with Eventually {
runDeltaSql(
s"""CREATE TABLE `${testTableName}` (col1 INT) USING DELTA
|TBLPROPERTIES (
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.columnMapping.mode' = 'name',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
Expand All @@ -126,85 +123,21 @@ class ConvertToIcebergSuite extends QueryTest with Eventually {
withDefaultTablePropsInSQLConf {
deltaSpark.range(10).write.format("delta")
.option("path", testTablePath)
.option("delta.enableIcebergCompatV2", "true")
.saveAsTable(testTableName)
}
}
withDeltaSparkSession { deltaSpark =>
deltaSpark.range(10, 20, 1)
.write.format("delta").mode("append")
.option("path", testTablePath)
.option("delta.enableIcebergCompatV2", "true")
.saveAsTable(testTableName)
}
verifyReadWithIceberg(testTableName, 0 to 19 map (Row(_)))
}
}

test("Expire Snapshots") {
if (hmsReady(PORT)) {
runDeltaSql(
s"""CREATE TABLE `${testTableName}` (col1 INT) USING DELTA
|TBLPROPERTIES (
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.columnMapping.mode' = 'name',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)

val icebergTable = loadIcebergTable()
icebergTable.updateProperties().set("history.expire.max-snapshot-age-ms", "1").commit()

for (i <- 0 to 7) {
runDeltaSql(s"INSERT INTO ${testTableName} VALUES (${i})",
DeltaSQLConf.DELTA_UNIFORM_ICEBERG_SYNC_CONVERT_ENABLED.key -> "true")
}

// Sleep past snapshot retention duration
Thread.sleep(5)
withIcebergSparkSession { icebergSpark => {
icebergSpark.sql(s"REFRESH TABLE $testTableName")
val manifestListsBeforeExpiration = icebergSpark
.sql(s"SELECT * FROM default.${testTableName}.snapshots")
.select("manifest_list")
.collect()

assert(manifestListsBeforeExpiration.length == 8)

// Trigger snapshot expiration
runDeltaSql(s"OPTIMIZE ${testTableName}")
icebergSpark.sql(s"REFRESH TABLE $testTableName")

val manifestListsAfterExpiration = icebergSpark
.sql(s"SELECT * FROM default.${testTableName}.snapshots")
.select("manifest_list")
.collect()

assert(manifestListsAfterExpiration.length == 1)
// Manifests from earlier snapshots should not be removed
manifestListsBeforeExpiration.toStream.foreach(
manifestList => assert(
icebergTable.io().newInputFile(manifestList.get(0).asInstanceOf[String]).exists()))
}}
}
}

private def loadIcebergTable(): shadedForDelta.org.apache.iceberg.Table = {
withDeltaSparkSession { deltaSpark => {
val log = DeltaLog.forTable(deltaSpark, testTablePath)
val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(
log.newDeltaHadoopConf()
)
val table = hiveCatalog.loadTable(
shadedForDelta.org.apache.iceberg.catalog.TableIdentifier
.of("default", testTableName)
)
table
}}
}

def runDeltaSql(sqlStr: String, conf: (String, String)*): Unit = {
def runDeltaSql(sqlStr: String): Unit = {
withDeltaSparkSession { deltaSpark =>
conf.foreach(c => deltaSpark.conf.set(c._1, c._2))
deltaSpark.sql(sqlStr)
}
}
Expand Down
Loading

0 comments on commit ea1f2a1

Please sign in to comment.