Skip to content

Commit

Permalink
version1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Oct 10, 2023
1 parent bbb7c4f commit 03ed3ac
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<td>Long</td>
<td>The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.</td>
</tr>
<tr>
<td><h5>read.readChangeLog</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to read row in the form of changelog (add RowKind in row to represent its change type).</td>
</tr>
<tr>
<td><h5>write.merge-schema</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public static RowType project(RowType inputType, int[] mapping) {
Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
}

public static RowType project(RowType inputType, List<String> names) {
List<DataField> fields = inputType.getFields();
List<String> fieldNames = fields.stream().map(DataField::name).collect(Collectors.toList());
return new RowType(
names.stream()
.map(k -> fields.get(fieldNames.indexOf(k)))
.collect(Collectors.toList()));
}

public static Object castFromString(String s, DataType type) {
return castFromStringInternal(s, type, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ public class SparkConnectorOptions {
.noDefaultValue()
.withDescription(
"The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.");

public static final ConfigOption<Boolean> READ_CHANGELOG =
key("read.readChangeLog")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to read row in the form of changelog (add RowKind in row to represent its change type).");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark;

import org.apache.paimon.spark.sources.PaimonMicroBatchStream;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
Expand Down Expand Up @@ -83,7 +83,7 @@ public PartitionReaderFactory createReaderFactory() {

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new PaimonMicroBatchStream((FileStoreTable) table, readBuilder, checkpointLocation);
return new PaimonMicroBatchStream((DataTable) table, readBuilder, checkpointLocation);
}

protected List<Split> splits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.options.Options
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory}
import org.apache.paimon.table.system.AuditLogTable

import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
Expand Down Expand Up @@ -70,17 +71,22 @@ class SparkSource
mode: SparkSaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val table = loadTable(parameters.asJava)
val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
WriteIntoPaimonTable(table, SaveMode.transform(mode), data, Options.fromMap(parameters.asJava))
.run(sqlContext.sparkSession)
SparkSource.toBaseRelation(table, sqlContext)
}

private def loadTable(options: JMap[String, String]): FileStoreTable = {
private def loadTable(options: JMap[String, String]): DataTable = {
val catalogContext = CatalogContext.create(
Options.fromMap(options),
SparkSession.active.sessionState.newHadoopConf())
FileStoreTableFactory.create(catalogContext)
val table = FileStoreTableFactory.create(catalogContext)
if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
new AuditLogTable(table)
} else {
table
}
}

override def createSink(
Expand All @@ -91,7 +97,7 @@ class SparkSource
if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) {
throw new RuntimeException("Paimon supports only Complete and Append output mode.")
}
val table = loadTable(parameters.asJava)
val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
val options = Options.fromMap(parameters.asJava)
new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.paimon.spark.sources

import org.apache.paimon.options.Options
import org.apache.paimon.spark.{PaimonImplicits, SparkConnectorOptions, SparkInputPartition, SparkReaderFactory}
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.DataTable
import org.apache.paimon.table.source.ReadBuilder

import org.apache.spark.internal.Logging
Expand All @@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset,
import scala.collection.mutable

class PaimonMicroBatchStream(
originTable: FileStoreTable,
originTable: DataTable,
readBuilder: ReadBuilder,
checkpointLocation: String)
extends MicroBatchStream
Expand Down Expand Up @@ -144,6 +144,6 @@ class PaimonMicroBatchStream(

override def stop(): Unit = {}

override def table: FileStoreTable = originTable
override def table: DataTable = originTable

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.paimon.spark.sources
import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.spark.commands.WithFileStoreTable
import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan, ScanMode}
import org.apache.paimon.table.DataTable
import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
import org.apache.paimon.table.source.TableScan.Plan
import org.apache.paimon.table.source.snapshot.StartingContext
import org.apache.paimon.utils.RowDataPartitionComputer
import org.apache.paimon.utils.{RowDataPartitionComputer, TypeUtils}

import org.apache.spark.sql.connector.read.streaming.ReadLimit
import org.apache.spark.sql.execution.datasources.PartitioningUtils
Expand All @@ -35,7 +35,9 @@ import scala.collection.mutable

case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit)

trait StreamHelper extends WithFileStoreTable {
trait StreamHelper {

def table: DataTable

val initOffset: PaimonSourceOffset

Expand All @@ -44,12 +46,12 @@ trait StreamHelper extends WithFileStoreTable {
private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()

private lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(table.schema().logicalPartitionType())
SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), table.primaryKeys()))

private lazy val partitionComputer: RowDataPartitionComputer = new RowDataPartitionComputer(
new CoreOptions(table.schema.options).partitionDefaultName,
table.schema.logicalPartitionType,
table.schema.partitionKeys.asScala.toArray
new CoreOptions(table.options).partitionDefaultName,
TypeUtils.project(table.rowType(), table.primaryKeys()),
table.partitionKeys().asScala.toArray
)

// Used to get the initial offset.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark

import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest

class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest {

import testImplicits._

test("Paimon CDC Source: batch write and streaming read change-log with default scan mode") {
withTempDir {
checkpointDir =>
val tableName = "T"
spark.sql(s"DROP TABLE IF EXISTS $tableName")
spark.sql(s"""
|CREATE TABLE $tableName (a INT, b STRING)
|TBLPROPERTIES (
| 'primary-key'='a',
| 'write-mode'='change-log',
| 'bucket'='2',
| 'changelog-producer' = 'lookup')
|""".stripMargin)

spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")

val table = loadTable(tableName)
val location = table.location().getPath

val readStream = spark.readStream
.format("paimon")
.option("read.readChangeLog", "true")
.load(location)
.writeStream
.format("memory")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.queryName("mem_table")
.outputMode("append")
.start()

val currentResult = () => spark.sql("SELECT * FROM mem_table")
try {
readStream.processAllAvailable()
val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2_new") :: Nil
checkAnswer(currentResult(), expertResult1)

spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3, 'v_3')")
readStream.processAllAvailable()
val expertResult2 =
Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
"+I",
2,
"v_2_new") :: Row("+I", 3, "v_3") :: Nil
checkAnswer(currentResult(), expertResult2)
} finally {
readStream.stop()
}
}
}

test("Paimon CDC Source: batch write and streaming read change-log with scan.snapshot-id") {
withTempDir {
checkpointDir =>
val tableName = "T"
spark.sql(s"DROP TABLE IF EXISTS $tableName")
spark.sql(s"""
|CREATE TABLE $tableName (a INT, b STRING)
|TBLPROPERTIES (
| 'primary-key'='a',
| 'write-mode'='change-log',
| 'bucket'='2',
| 'changelog-producer' = 'lookup')
|""".stripMargin)

spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")

val table = loadTable(tableName)
val location = table.location().getPath

val readStream = spark.readStream
.format("paimon")
.option("read.readChangeLog", "true")
.option("scan.mode", "from-snapshot")
.option("scan.snapshot-id", 1)
.load(location)
.writeStream
.format("memory")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.queryName("mem_table")
.outputMode("append")
.start()

val currentResult = () => spark.sql("SELECT * FROM mem_table")
try {
readStream.processAllAvailable()
val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Row(
"-U",
2,
"v_2") :: Row("+U", 2, "v_2_new") :: Nil
checkAnswer(currentResult(), expertResult1)

spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3, 'v_3')")
readStream.processAllAvailable()
val expertResult2 =
Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
"+I",
2,
"v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: Row("+I", 3, "v_3") :: Nil
checkAnswer(currentResult(), expertResult2)
} finally {
readStream.stop()
}
}
}

test("Paimon CDC Source: streaming write and streaming read change-log") {
withTempDirs {
(checkpointDir1, checkpointDir2) =>
val tableName = "T"
spark.sql(s"DROP TABLE IF EXISTS $tableName")
spark.sql(s"""
|CREATE TABLE $tableName (a INT, b STRING)
|TBLPROPERTIES (
| 'primary-key'='a',
| 'write-mode'='change-log',
| 'bucket'='2',
| 'changelog-producer' = 'lookup')
|""".stripMargin)

val table = loadTable(tableName)
val location = table.location().getPath

// streaming write
val inputData = MemoryStream[(Int, String)]
val writeStream = inputData
.toDS()
.toDF("a", "b")
.writeStream
.option("checkpointLocation", checkpointDir1.getCanonicalPath)
.foreachBatch {
(batch: Dataset[Row], _: Long) =>
batch.write.format("paimon").mode("append").save(location)
}
.start()

// streaming read
val readStream = spark.readStream
.format("paimon")
.option("read.readChangeLog", "true")
.option("scan.mode", "from-snapshot")
.option("scan.snapshot-id", 1)
.load(location)
.writeStream
.format("memory")
.option("checkpointLocation", checkpointDir2.getCanonicalPath)
.queryName("mem_table")
.outputMode("append")
.start()

val currentResult = () => spark.sql("SELECT * FROM mem_table")
try {
inputData.addData((1, "v_1"))
writeStream.processAllAvailable()
readStream.processAllAvailable()
val expertResult1 = Row("+I", 1, "v_1") :: Nil
checkAnswer(currentResult(), expertResult1)

inputData.addData((2, "v_2"))
writeStream.processAllAvailable()
readStream.processAllAvailable()
val expertResult2 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil
checkAnswer(currentResult(), expertResult2)

inputData.addData((2, "v_2_new"))
writeStream.processAllAvailable()
readStream.processAllAvailable()
val expertResult3 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Row(
"-U",
2,
"v_2") :: Row("+U", 2, "v_2_new") :: Nil
checkAnswer(currentResult(), expertResult3)

inputData.addData((1, "v_1_new"), (3, "v_3"))
writeStream.processAllAvailable()
readStream.processAllAvailable()
val expertResult4 =
Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
"+I",
2,
"v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: Row("+I", 3, "v_3") :: Nil
checkAnswer(currentResult(), expertResult4)
} finally {
readStream.stop()
}
}
}

}

0 comments on commit 03ed3ac

Please sign in to comment.