diff --git a/docs/layouts/shortcodes/generated/spark_connector_configuration.html b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 9f2e6962eb3c..3d271d970818 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -56,6 +56,12 @@
Long |
The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together. |
+
+ read.readChangeLog |
+ false |
+ Boolean |
+ Whether to read row in the form of changelog (add RowKind in row to represent its change type). |
+
write.merge-schema |
false |
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 5e1cac68b4d7..6ee72b0b482c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -51,6 +51,15 @@ public static RowType project(RowType inputType, int[] mapping) {
Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
}
+ public static RowType project(RowType inputType, List names) {
+ List fields = inputType.getFields();
+ List fieldNames = fields.stream().map(DataField::name).collect(Collectors.toList());
+ return new RowType(
+ names.stream()
+ .map(k -> fields.get(fieldNames.indexOf(k)))
+ .collect(Collectors.toList()));
+ }
+
public static Object castFromString(String s, DataType type) {
return castFromStringInternal(s, type, false);
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 2f9e9297ce83..043ce05bfe4c 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -69,4 +69,11 @@ public class SparkConnectorOptions {
.noDefaultValue()
.withDescription(
"The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.");
+
+ public static final ConfigOption READ_CHANGELOG =
+ key("read.readChangeLog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to read row in the form of changelog (add RowKind in row to represent its change type).");
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index e277834b5e75..04d959da630d 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -19,7 +19,7 @@
package org.apache.paimon.spark;
import org.apache.paimon.spark.sources.PaimonMicroBatchStream;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -83,7 +83,7 @@ public PartitionReaderFactory createReaderFactory() {
@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
- return new PaimonMicroBatchStream((FileStoreTable) table, readBuilder, checkpointLocation);
+ return new PaimonMicroBatchStream((DataTable) table, readBuilder, checkpointLocation);
}
protected List splits() {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index 9495cb0fe9cc..b78d56851eaf 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -21,7 +21,8 @@ import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.options.Options
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
-import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
+import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory}
+import org.apache.paimon.table.system.AuditLogTable
import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
@@ -70,17 +71,22 @@ class SparkSource
mode: SparkSaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
- val table = loadTable(parameters.asJava)
+ val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
WriteIntoPaimonTable(table, SaveMode.transform(mode), data, Options.fromMap(parameters.asJava))
.run(sqlContext.sparkSession)
SparkSource.toBaseRelation(table, sqlContext)
}
- private def loadTable(options: JMap[String, String]): FileStoreTable = {
+ private def loadTable(options: JMap[String, String]): DataTable = {
val catalogContext = CatalogContext.create(
Options.fromMap(options),
SparkSession.active.sessionState.newHadoopConf())
- FileStoreTableFactory.create(catalogContext)
+ val table = FileStoreTableFactory.create(catalogContext)
+ if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
+ new AuditLogTable(table)
+ } else {
+ table
+ }
}
override def createSink(
@@ -91,7 +97,7 @@ class SparkSource
if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) {
throw new RuntimeException("Paimon supports only Complete and Append output mode.")
}
- val table = loadTable(parameters.asJava)
+ val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
val options = Options.fromMap(parameters.asJava)
new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
index f1f8f7e8cdff..702065a134e0 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -19,7 +19,7 @@ package org.apache.paimon.spark.sources
import org.apache.paimon.options.Options
import org.apache.paimon.spark.{PaimonImplicits, SparkConnectorOptions, SparkInputPartition, SparkReaderFactory}
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.DataTable
import org.apache.paimon.table.source.ReadBuilder
import org.apache.spark.internal.Logging
@@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset,
import scala.collection.mutable
class PaimonMicroBatchStream(
- originTable: FileStoreTable,
+ originTable: DataTable,
readBuilder: ReadBuilder,
checkpointLocation: String)
extends MicroBatchStream
@@ -144,6 +144,6 @@ class PaimonMicroBatchStream(
override def stop(): Unit = {}
- override def table: FileStoreTable = originTable
+ override def table: DataTable = originTable
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index 13afc81a4c29..04a90259bee9 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -20,11 +20,11 @@ package org.apache.paimon.spark.sources
import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.spark.SparkTypeUtils
-import org.apache.paimon.spark.commands.WithFileStoreTable
-import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan, ScanMode}
+import org.apache.paimon.table.DataTable
+import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
import org.apache.paimon.table.source.TableScan.Plan
import org.apache.paimon.table.source.snapshot.StartingContext
-import org.apache.paimon.utils.RowDataPartitionComputer
+import org.apache.paimon.utils.{RowDataPartitionComputer, TypeUtils}
import org.apache.spark.sql.connector.read.streaming.ReadLimit
import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -35,7 +35,9 @@ import scala.collection.mutable
case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit)
-trait StreamHelper extends WithFileStoreTable {
+trait StreamHelper {
+
+ def table: DataTable
val initOffset: PaimonSourceOffset
@@ -44,12 +46,12 @@ trait StreamHelper extends WithFileStoreTable {
private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
private lazy val partitionSchema: StructType =
- SparkTypeUtils.fromPaimonRowType(table.schema().logicalPartitionType())
+ SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), table.primaryKeys()))
private lazy val partitionComputer: RowDataPartitionComputer = new RowDataPartitionComputer(
- new CoreOptions(table.schema.options).partitionDefaultName,
- table.schema.logicalPartitionType,
- table.schema.partitionKeys.asScala.toArray
+ new CoreOptions(table.options).partitionDefaultName,
+ TypeUtils.project(table.rowType(), table.primaryKeys()),
+ table.partitionKeys().asScala.toArray
)
// Used to get the initial offset.
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
new file mode 100644
index 000000000000..782ae9cd5689
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest {
+
+ import testImplicits._
+
+ test("Paimon CDC Source: batch write and streaming read change-log with default scan mode") {
+ withTempDir {
+ checkpointDir =>
+ val tableName = "T"
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ spark.sql(s"""
+ |CREATE TABLE $tableName (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'write-mode'='change-log',
+ | 'bucket'='2',
+ | 'changelog-producer' = 'lookup')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
+ spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
+ spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
+
+ val table = loadTable(tableName)
+ val location = table.location().getPath
+
+ val readStream = spark.readStream
+ .format("paimon")
+ .option("read.readChangeLog", "true")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ readStream.processAllAvailable()
+ val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2_new") :: Nil
+ checkAnswer(currentResult(), expertResult1)
+
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3, 'v_3')")
+ readStream.processAllAvailable()
+ val expertResult2 =
+ Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
+ "+I",
+ 2,
+ "v_2_new") :: Row("+I", 3, "v_3") :: Nil
+ checkAnswer(currentResult(), expertResult2)
+ } finally {
+ readStream.stop()
+ }
+ }
+ }
+
+ test("Paimon CDC Source: batch write and streaming read change-log with scan.snapshot-id") {
+ withTempDir {
+ checkpointDir =>
+ val tableName = "T"
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ spark.sql(s"""
+ |CREATE TABLE $tableName (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'write-mode'='change-log',
+ | 'bucket'='2',
+ | 'changelog-producer' = 'lookup')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
+ spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
+ spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
+
+ val table = loadTable(tableName)
+ val location = table.location().getPath
+
+ val readStream = spark.readStream
+ .format("paimon")
+ .option("read.readChangeLog", "true")
+ .option("scan.mode", "from-snapshot")
+ .option("scan.snapshot-id", 1)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ readStream.processAllAvailable()
+ val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Row(
+ "-U",
+ 2,
+ "v_2") :: Row("+U", 2, "v_2_new") :: Nil
+ checkAnswer(currentResult(), expertResult1)
+
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3, 'v_3')")
+ readStream.processAllAvailable()
+ val expertResult2 =
+ Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
+ "+I",
+ 2,
+ "v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: Row("+I", 3, "v_3") :: Nil
+ checkAnswer(currentResult(), expertResult2)
+ } finally {
+ readStream.stop()
+ }
+ }
+ }
+
+ test("Paimon CDC Source: streaming write and streaming read change-log") {
+ withTempDirs {
+ (checkpointDir1, checkpointDir2) =>
+ val tableName = "T"
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ spark.sql(s"""
+ |CREATE TABLE $tableName (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'write-mode'='change-log',
+ | 'bucket'='2',
+ | 'changelog-producer' = 'lookup')
+ |""".stripMargin)
+
+ val table = loadTable(tableName)
+ val location = table.location().getPath
+
+ // streaming write
+ val inputData = MemoryStream[(Int, String)]
+ val writeStream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ // streaming read
+ val readStream = spark.readStream
+ .format("paimon")
+ .option("read.readChangeLog", "true")
+ .option("scan.mode", "from-snapshot")
+ .option("scan.snapshot-id", 1)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ inputData.addData((1, "v_1"))
+ writeStream.processAllAvailable()
+ readStream.processAllAvailable()
+ val expertResult1 = Row("+I", 1, "v_1") :: Nil
+ checkAnswer(currentResult(), expertResult1)
+
+ inputData.addData((2, "v_2"))
+ writeStream.processAllAvailable()
+ readStream.processAllAvailable()
+ val expertResult2 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil
+ checkAnswer(currentResult(), expertResult2)
+
+ inputData.addData((2, "v_2_new"))
+ writeStream.processAllAvailable()
+ readStream.processAllAvailable()
+ val expertResult3 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Row(
+ "-U",
+ 2,
+ "v_2") :: Row("+U", 2, "v_2_new") :: Nil
+ checkAnswer(currentResult(), expertResult3)
+
+ inputData.addData((1, "v_1_new"), (3, "v_3"))
+ writeStream.processAllAvailable()
+ readStream.processAllAvailable()
+ val expertResult4 =
+ Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
+ "+I",
+ 2,
+ "v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: Row("+I", 3, "v_3") :: Nil
+ checkAnswer(currentResult(), expertResult4)
+ } finally {
+ readStream.stop()
+ }
+ }
+ }
+
+}