Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into support-clean-exp…
Browse files Browse the repository at this point in the history
…ire-log

# Conflicts:
#	kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java
  • Loading branch information
horizonzy committed Jun 27, 2024
2 parents a01e834 + eb26989 commit 46f2f8e
Show file tree
Hide file tree
Showing 297 changed files with 16,706 additions and 5,398 deletions.
1 change: 1 addition & 0 deletions .github/workflows/spark_master_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ jobs:
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_test.yaml
run: |
TEST_PARALLELISM_COUNT=2 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
TEST_PARALLELISM_COUNT=2 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test
if: steps.git-diff.outputs.diff
12 changes: 12 additions & 0 deletions .github/workflows/spark_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ jobs:
sudo apt-get update
sudo apt-get install -y make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl git
sudo apt install libedit-dev
curl -LO https://github.com/bufbuild/buf/releases/download/v1.28.1/buf-Linux-x86_64.tar.gz
mkdir -p ~/buf
tar -xvzf buf-Linux-x86_64.tar.gz -C ~/buf --strip-components 1
rm buf-Linux-x86_64.tar.gz
sudo apt install python3-pip --fix-missing
sudo pip3 install pipenv==2021.5.29
curl https://pyenv.run | bash
Expand All @@ -50,10 +54,18 @@ jobs:
pyenv install 3.8.18
pyenv global system 3.8.18
pipenv --python 3.8 install
# Update the pip version to 24.0. By default `pyenv.run` installs the latest pip version
# available. From version 24.1, `pip` doesn't allow installing python packages
# with version string containing `-`. In Delta-Spark case, the pypi package generated has
# `-SNAPSHOT` in version (e.g. `3.3.0-SNAPSHOT`) as the version is picked up from
# the`version.sbt` file.
pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0
pipenv run pip install pyspark==3.5.0
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
pipenv run pip install black==23.9.1
pipenv run pip install importlib_metadata==3.10.0
pipenv run pip install mypy==0.982
pipenv run pip install mypy-protobuf==3.3.0
pipenv run pip install cryptography==37.0.4
pipenv run pip install twine==4.0.1
pipenv run pip install wheel==0.33.4
Expand Down
8 changes: 5 additions & 3 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,9 @@ The following is an example `remove` action.
```

### Add CDC File
The `cdc` action is used to add a [file](#change-data-files) containing only the data that was changed as part of the transaction. When change data readers encounter a `cdc` action in a particular Delta table version, they must read the changes made in that version exclusively using the `cdc` files. If a version has no `cdc` action, then the data in `add` and `remove` actions are read as inserted and deleted rows, respectively.
The `cdc` action is used to add a [file](#change-data-files) containing only the data that was changed as part of the transaction. The `cdc` action is allowed to add a [Data File](#data-files) that is also added by an `add` action, when it does not contain any copied rows and the `_change_type` column is filled for all rows.

When change data readers encounter a `cdc` action in a particular Delta table version, they must read the changes made in that version exclusively using the `cdc` files. If a version has no `cdc` action, then the data in `add` and `remove` actions are read as inserted and deleted rows, respectively.

The schema of the `cdc` action is as follows:

Expand Down Expand Up @@ -523,7 +525,7 @@ Specifically, to read the row-level changes made in a version, the following str

##### Note for non-change data readers

In a table with Change Data Feed enabled, the data Parquet files referenced by `add` and `remove` actions are allowed to contain an extra column `_change_type`. This column is not present in the table's schema and will consistently have a `null` value. When accessing these files, readers should disregard this column and only process columns defined within the table's schema.
In a table with Change Data Feed enabled, the data Parquet files referenced by `add` and `remove` actions are allowed to contain an extra column `_change_type`. This column is not present in the table's schema. When accessing these files, readers should disregard this column and only process columns defined within the table's schema.

### Transaction Identifiers
Incremental processing systems (e.g., streaming systems) that track progress using their own application-specific versions need to record what progress has been made, in order to avoid duplicating data in the face of failures and retries during a write.
Expand Down Expand Up @@ -1708,7 +1710,7 @@ numRecords | The number of records in this data file.
tightBounds | Whether per-column statistics are currently **tight** or **wide** (see below).

For any logical file where `deletionVector` is not `null`, the `numRecords` statistic *must* be present and accurate. That is, it must equal the number of records in the data file, not the valid records in the logical file.
In the presence of [Deletion Vectors](#Deletion-Vectors) the statistics may be somewhat outdated, i.e. not reflecting deleted rows yet. The flag `stats.tightBounds` indicates whether we have **tight bounds** (i.e. the min/maxValue exists[^1] in the valid state of the file) or **wide bounds** (i.e. the minValue is <= all valid values in the file, and the maxValue >= all valid values in the file). These upper/lower bounds are sufficient information for data skipping.
In the presence of [Deletion Vectors](#Deletion-Vectors) the statistics may be somewhat outdated, i.e. not reflecting deleted rows yet. The flag `stats.tightBounds` indicates whether we have **tight bounds** (i.e. the min/maxValue exists[^1] in the valid state of the file) or **wide bounds** (i.e. the minValue is <= all valid values in the file, and the maxValue >= all valid values in the file). These upper/lower bounds are sufficient information for data skipping. Note, `stats.tightBounds` should be treated as `true` when it is not explicitly present in the statistics.

Per-column statistics record information for each column in the file and they are encoded, mirroring the schema of the actual data.
For example, given the following data schema:
Expand Down
114 changes: 112 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
// scalastyle:off line.size.limit

import java.nio.file.Files

import sbt.internal.inc.Analysis
import sbtprotoc.ProtocPlugin.autoImport._

import xsbti.compile.CompileAnalysis

import Checkstyle._
import Mima._
import Unidoc._
Expand All @@ -39,6 +45,8 @@ val LATEST_RELEASED_SPARK_VERSION = "3.5.0"
val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT"
val sparkVersion = settingKey[String]("Spark version")
spark / sparkVersion := getSparkVersion()
connectCommon / sparkVersion := getSparkVersion()
connectServer / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
Expand All @@ -58,6 +66,9 @@ val hadoopVersionForHive2 = "2.7.2"
val hive2Version = "2.3.3"
val tezVersionForHive2 = "0.8.4"

val protoVersion = "3.25.1"
val grpcVersion = "1.62.2"

scalaVersion := default_scala_version.value

// crossScalaVersions must be set to Nil on the root project
Expand Down Expand Up @@ -187,6 +198,105 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
)
}

def runTaskOnlyOnSparkMaster[T](
task: sbt.TaskKey[T],
taskName: String,
projectName: String,
emptyValue: => T): Def.Initialize[Task[T]] = {
if (getSparkVersion() == SPARK_MASTER_VERSION) {
Def.task(task.value)
} else {
Def.task {
// scalastyle:off println
println(s"Project $projectName: Skipping `$taskName` as Spark version " +
s"${getSparkVersion()} does not equal $SPARK_MASTER_VERSION.")
// scalastyle:on println
emptyValue
}
}
}

lazy val connectCommon = (project in file("spark-connect/common"))
.settings(
name := "delta-connect-common",
commonSettings,
crossSparkSettings(),
releaseSettings,
Compile / compile := runTaskOnlyOnSparkMaster(
task = Compile / compile,
taskName = "compile",
projectName = "delta-connect-common",
emptyValue = Analysis.empty.asInstanceOf[CompileAnalysis]
).value,
Test / test := runTaskOnlyOnSparkMaster(
task = Test / test,
taskName = "test",
projectName = "delta-connect-common",
emptyValue = ()).value,
publish := runTaskOnlyOnSparkMaster(
task = publish,
taskName = "publish",
projectName = "delta-connect-common",
emptyValue = ()).value,
libraryDependencies ++= Seq(
"io.grpc" % "protoc-gen-grpc-java" % grpcVersion asProtocPlugin(),
"io.grpc" % "grpc-protobuf" % grpcVersion,
"io.grpc" % "grpc-stub" % grpcVersion,
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf",
"javax.annotation" % "javax.annotation-api" % "1.3.2",

"org.apache.spark" %% "spark-connect-common" % sparkVersion.value % "provided",
),
PB.protocVersion := protoVersion,
Compile / PB.targets := Seq(
PB.gens.java -> (Compile / sourceManaged).value,
PB.gens.plugin("grpc-java") -> (Compile / sourceManaged).value
),
)

lazy val connectServer = (project in file("spark-connect/server"))
.dependsOn(connectCommon % "compile->compile;test->test;provided->provided")
.dependsOn(spark % "compile->compile;test->test;provided->provided")
.settings(
name := "delta-connect-server",
commonSettings,
releaseSettings,
Compile / compile := runTaskOnlyOnSparkMaster(
task = Compile / compile,
taskName = "compile",
projectName = "delta-connect-server",
emptyValue = Analysis.empty.asInstanceOf[CompileAnalysis]
).value,
Test / test := runTaskOnlyOnSparkMaster(
task = Test / test,
taskName = "test",
projectName = "delta-connect-server",
emptyValue = ()
).value,
publish := runTaskOnlyOnSparkMaster(
task = publish,
taskName = "publish",
projectName = "delta-connect-server",
emptyValue = ()
).value,
crossSparkSettings(),
libraryDependencies ++= Seq(
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf",

"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-connect" % sparkVersion.value % "provided",

"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-connect" % sparkVersion.value % "test" classifier "tests",
),
)

lazy val spark = (project in file("spark"))
.dependsOn(storage)
.enablePlugins(Antlr4Plugin)
Expand Down Expand Up @@ -319,7 +429,7 @@ lazy val sharing = (project in file("sharing"))
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "provided",

"io.delta" %% "delta-sharing-client" % "1.0.5",
"io.delta" %% "delta-sharing-client" % "1.1.0",

// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down Expand Up @@ -612,7 +722,7 @@ lazy val hudi = (project in file("hudi"))
scalaStyleSettings,
releaseSettings,
libraryDependencies ++= Seq(
"org.apache.hudi" % "hudi-java-client" % "0.14.0" % "compile" excludeAll(
"org.apache.hudi" % "hudi-java-client" % "0.15.0" % "compile" excludeAll(
ExclusionRule(organization = "org.apache.hadoop"),
ExclusionRule(organization = "org.apache.zookeeper"),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class KernelDeltaLogDelegator(
// called on a standard standalone snapshot.
val kernelSnapshotWrapper = new KernelSnapshotWrapper(kernelSnapshot)
currKernelSnapshot = Some(new KernelSnapshotDelegator(
engine,
kernelSnapshot,
kernelSnapshotWrapper,
hadoopConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package io.delta.standalone.internal

import io.delta.kernel.defaults.engine.DefaultEngine
import io.delta.kernel.engine.Engine
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -51,6 +53,7 @@ import io.delta.standalone.internal.util.ConversionUtils
* - allFilesScala (only used in verifySchemaCompatibility)
*/
class KernelSnapshotDelegator(
engine: Engine,
kernelSnapshot: SnapshotImplKernel,
// This needs to be an argument to the constructor since the constructor of SnapshotImpl might call back
// into things like `metadataScala`, and this needs to be already initalized for that
Expand Down Expand Up @@ -79,7 +82,7 @@ class KernelSnapshotDelegator(

// provide a path to use the faster txn lookup in kernel
def getLatestTransactionVersion(id: String): Option[Long] = {
val versionJOpt = kernelSnapshot.getLatestTransactionVersion(id)
val versionJOpt = kernelSnapshot.getLatestTransactionVersion(engine, id)
if (versionJOpt.isPresent) {
Some(versionJOpt.get)
} else {
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1717778521300,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"3","numOutputBytes":"9126"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.3.0-SNAPSHOT","txnId":"5e3bfa16-cf0f-4d40-ad7d-b6426a6b4b7a"}}
{"metaData":{"id":"7f750aff-9bf2-4e52-bfce-39811932da26","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"decimal_4_0\",\"type\":\"decimal(4,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_7_0\",\"type\":\"decimal(7,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_7_6\",\"type\":\"decimal(7,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_12_0\",\"type\":\"decimal(12,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_12_6\",\"type\":\"decimal(12,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_15_0\",\"type\":\"decimal(15,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_15_6\",\"type\":\"decimal(15,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_15_12\",\"type\":\"decimal(15,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_18_0\",\"type\":\"decimal(18,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_18_6\",\"type\":\"decimal(18,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_18_12\",\"type\":\"decimal(18,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_0\",\"type\":\"decimal(25,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_6\",\"type\":\"decimal(25,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_12\",\"type\":\"decimal(25,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_18\",\"type\":\"decimal(25,18)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_24\",\"type\":\"decimal(25,24)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_0\",\"type\":\"decimal(35,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_6\",\"type\":\"decimal(35,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_12\",\"type\":\"decimal(35,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_18\",\"type\":\"decimal(35,18)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_24\",\"type\":\"decimal(35,24)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_30\",\"type\":\"decimal(35,30)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_0\",\"type\":\"decimal(38,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_6\",\"type\":\"decimal(38,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_12\",\"type\":\"decimal(38,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_18\",\"type\":\"decimal(38,18)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_24\",\"type\":\"decimal(38,24)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_30\",\"type\":\"decimal(38,30)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_36\",\"type\":\"decimal(38,36)\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1717778519308}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00000-bb4b3e59-ddb9-4d26-beaf-de9554e14517-c000.snappy.parquet","partitionValues":{},"size":9126,"modificationTime":1717778521237,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"decimal_4_0\":-13,\"decimal_7_0\":0,\"decimal_12_0\":0,\"decimal_12_6\":-0.000098,\"decimal_15_0\":-157,\"decimal_15_6\":-3.346000,\"decimal_15_12\":-0.002162000000,\"decimal_18_0\":0,\"decimal_18_6\":-22641.000000,\"decimal_18_12\":-5.190000000000,\"decimal_25_0\":0,\"decimal_25_6\":-0.000013,\"decimal_25_12\":-3.1661E-8,\"decimal_25_18\":-24199.000000000000000000,\"decimal_35_0\":0,\"decimal_35_6\":-0.000161,\"decimal_35_12\":-2.59176E-7,\"decimal_35_18\":-1.36744000E-10,\"decimal_35_24\":-22827907.000000000000000000000000,\"decimal_35_30\":-32805.309000000000000000000000000000,\"decimal_38_0\":-17,\"decimal_38_6\":-0.027994,\"decimal_38_12\":-0.000024695819,\"decimal_38_18\":-4.614771000E-9,\"decimal_38_24\":-9.718032000000E-12,\"decimal_38_30\":-2.6626087000000000E-14,\"decimal_38_36\":-2.9546424000000000000E-17},\"maxValues\":{\"decimal_4_0\":4,\"decimal_7_0\":0,\"decimal_12_0\":0,\"decimal_12_6\":0.000062,\"decimal_15_0\":481,\"decimal_15_6\":3.302000,\"decimal_15_12\":0.001469000000,\"decimal_18_0\":0,\"decimal_18_6\":7998.000000,\"decimal_18_12\":10.994000000000,\"decimal_25_0\":0,\"decimal_25_6\":0.000021,\"decimal_25_12\":5.925E-9,\"decimal_25_18\":234942.000000000000000000,\"decimal_35_0\":0,\"decimal_35_6\":0.000161,\"decimal_35_12\":1.65519E-7,\"decimal_35_18\":1.52896000E-10,\"decimal_35_24\":14797356.000000000000000000000000,\"decimal_35_30\":8083.687000000000000000000000000000,\"decimal_38_0\":26,\"decimal_38_6\":0.021882,\"decimal_38_12\":0.000032950993,\"decimal_38_18\":1.2783803000E-8,\"decimal_38_24\":2.395564000000E-12,\"decimal_38_30\":2.9414203000000000E-14,\"decimal_38_36\":3.241836000000000000E-18},\"nullCount\":{\"decimal_4_0\":1,\"decimal_7_0\":1,\"decimal_7_6\":3,\"decimal_12_0\":1,\"decimal_12_6\":1,\"decimal_15_0\":1,\"decimal_15_6\":1,\"decimal_15_12\":1,\"decimal_18_0\":1,\"decimal_18_6\":1,\"decimal_18_12\":1,\"decimal_25_0\":1,\"decimal_25_6\":1,\"decimal_25_12\":1,\"decimal_25_18\":1,\"decimal_25_24\":3,\"decimal_35_0\":1,\"decimal_35_6\":1,\"decimal_35_12\":1,\"decimal_35_18\":1,\"decimal_35_24\":1,\"decimal_35_30\":1,\"decimal_38_0\":1,\"decimal_38_6\":1,\"decimal_38_12\":1,\"decimal_38_18\":1,\"decimal_38_24\":1,\"decimal_38_30\":1,\"decimal_38_36\":1}}"}}
Binary file not shown.
Loading

0 comments on commit 46f2f8e

Please sign in to comment.