Skip to content

Commit

Permalink
[spark] Minor refactor SparkWriter to separate commit (apache#3067)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored and yuzelin committed Mar 22, 2024
1 parent d38fc4c commit c600f46
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -108,4 +109,6 @@ default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) {
}

boolean supportStreamingReadOverwrite();

RowKeyExtractor createRowKeyExtractor();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.commands

import org.apache.paimon.data.{InternalRow => PaimonInternalRow}
import org.apache.paimon.index.HashBucketAssigner
import org.apache.paimon.spark.SparkRow
import org.apache.paimon.spark.util.EncoderUtils
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.RowPartitionKeyExtractor

import org.apache.spark.TaskContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{InternalRow => SparkInternalRow}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.{Deserializer, Serializer}
import org.apache.spark.sql.types.StructType

import java.util.UUID

case class EncoderSerDeGroup(schema: StructType) {

val encoder: ExpressionEncoder[Row] = EncoderUtils.encode(schema).resolveAndBind()

private val serializer: Serializer[Row] = encoder.createSerializer()

private val deserializer: Deserializer[Row] = encoder.createDeserializer()

def rowToInternal(row: Row): SparkInternalRow = {
serializer(row)
}

def internalToRow(internalRow: SparkInternalRow): Row = {
deserializer(internalRow)
}
}

sealed trait BucketProcessor {
def processPartition(rowIterator: Iterator[Row]): Iterator[Row]
}

case class CommonBucketProcessor(
table: FileStoreTable,
bucketColIndex: Int,
encoderGroup: EncoderSerDeGroup)
extends BucketProcessor {

def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = {
val rowType = table.rowType()
val rowKeyExtractor = table.createRowKeyExtractor()

def getBucketId(row: PaimonInternalRow): Int = {
rowKeyExtractor.setRecord(row)
rowKeyExtractor.bucket()
}

new Iterator[Row] {
override def hasNext: Boolean = rowIterator.hasNext

override def next(): Row = {
val row = rowIterator.next
val sparkInternalRow = encoderGroup.rowToInternal(row)
sparkInternalRow.setInt(bucketColIndex, getBucketId((new SparkRow(rowType, row))))
encoderGroup.internalToRow(sparkInternalRow)
}
}
}
}

case class DynamicBucketProcessor(
fileStoreTable: FileStoreTable,
bucketColIndex: Int,
numSparkPartitions: Int,
numAssigners: Int,
encoderGroup: EncoderSerDeGroup
) extends BucketProcessor {

private val targetBucketRowNumber = fileStoreTable.coreOptions.dynamicBucketTargetRowNum
private val rowType = fileStoreTable.rowType
private val commitUser = UUID.randomUUID.toString

def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = {
val rowPartitionKeyExtractor = new RowPartitionKeyExtractor(fileStoreTable.schema)
val assigner = new HashBucketAssigner(
fileStoreTable.snapshotManager(),
commitUser,
fileStoreTable.store.newIndexFileHandler,
numSparkPartitions,
numAssigners,
TaskContext.getPartitionId(),
targetBucketRowNumber
)

new Iterator[Row]() {
override def hasNext: Boolean = rowIterator.hasNext

override def next(): Row = {
val row = rowIterator.next
val sparkRow = new SparkRow(rowType, row)
val hash = rowPartitionKeyExtractor.trimmedPrimaryKey(sparkRow).hashCode
val partition = rowPartitionKeyExtractor.partition(sparkRow)
val bucket = assigner.assign(partition, hash)
val sparkInternalRow = encoderGroup.rowToInternal(row)
sparkInternalRow.setInt(bucketColIndex, bucket)
encoderGroup.internalToRow(sparkInternalRow)
}
}
}
}

case class UnawareBucketProcessor(bucketColIndex: Int, encoderGroup: EncoderSerDeGroup)
extends BucketProcessor {

def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = {
new Iterator[Row] {
override def hasNext: Boolean = rowIterator.hasNext

override def next(): Row = {
val row = rowIterator.next
val sparkInternalRow = encoderGroup.rowToInternal(row)
sparkInternalRow.setInt(bucketColIndex, 0)
encoderGroup.internalToRow(sparkInternalRow)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,17 @@ package org.apache.paimon.spark.commands

import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.SparkFilterConverter
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageSerializer}
import org.apache.paimon.types.RowType

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PredicateHelper}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}

import java.io.IOException

/** Helper trait for all paimon commands. */
trait PaimonCommand extends WithFileStoreTable with PredicateHelper {

lazy val bucketMode: BucketMode = table match {
case fileStoreTable: FileStoreTable =>
fileStoreTable.bucketMode
case _ =>
BucketMode.FIXED
}

def deserializeCommitMessage(
serializer: CommitMessageSerializer,
bytes: Array[Byte]): CommitMessage = {
try {
serializer.deserialize(serializer.getVersion, bytes)
} catch {
case e: IOException =>
throw new RuntimeException("Failed to deserialize CommitMessage's object", e)
}
}

protected def convertConditionToPaimonPredicate(
condition: Expression,
output: Seq[Attribute]): Predicate = {
Expand Down Expand Up @@ -98,15 +76,15 @@ trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
}.toMap
}

def splitConjunctiveFilters(filter: Filter): Seq[Filter] = {
private def splitConjunctiveFilters(filter: Filter): Seq[Filter] = {
filter match {
case And(filter1, filter2) =>
splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
case other => other :: Nil
}
}

def isNestedFilterInValue(value: Any): Boolean = {
private def isNestedFilterInValue(value: Any): Boolean = {
value.isInstanceOf[Filter]
}

Expand Down
Loading

0 comments on commit c600f46

Please sign in to comment.