Skip to content

Commit

Permalink
Introduce actions, table feature, properties
Browse files Browse the repository at this point in the history
  • Loading branch information
prakharjain09 committed Aug 17, 2023
1 parent 3835df1 commit 6d6699a
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ object Checkpoints extends DeltaLogging {
action
}
// commitInfo, cdc, remove.tags and remove.stats are not included in the checkpoint
.drop("commitInfo", "cdc")
// TODO: Add support for V2 Checkpoints here.
.drop("commitInfo", "cdc", "checkpointMetadata", "sidecarFile")
.withColumn("remove", col("remove").dropFields("tags", "stats"))

val chk = buildCheckpoint(base, snapshot)
Expand Down Expand Up @@ -684,3 +685,64 @@ object Checkpoints extends DeltaLogging {
} else Some(struct(partitionValues: _*).as(STRUCT_PARTITIONS_COL_NAME))
}
}

object V2Checkpoint {
/** Format for V2 Checkpoints */
sealed abstract class Format(val name: String) {
def fileFormat: FileFormat
}

def toFormat(fileName: String): Format = fileName match {
case _ if fileName.endsWith(Format.JSON.name) => Format.JSON
case _ if fileName.endsWith(Format.PARQUET.name) => Format.PARQUET
case _ => throw new IllegalStateException(s"Unknown v2 checkpoint file format: ${fileName}")
}

object Format {
/** json v2 checkpoint */
object JSON extends Format("json") {
override def fileFormat: FileFormat = DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_JSON
}

/** parquet v2 checkpoint */
object PARQUET extends Format("parquet") {
override def fileFormat: FileFormat = DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET
}

/** All valid formats for the top level file of v2 checkpoints. */
val ALL: Set[Format] = Set(Format.JSON, Format.PARQUET)

/** The string representations of all the valid formats. */
val ALL_AS_STRINGS: Set[String] = ALL.map(_.name)
}
}

object CheckpointPolicy {

sealed abstract class Policy(val name: String) {
override def toString: String = name
def needsV2CheckpointSupport: Boolean = true
}

/**
* Write classic single file/multi-part checkpoints when this policy is enabled.
* Note that [[V2CheckpointTableFeature]] is not required for this checkpoint policy.
*/
case object Classic extends Policy("classic") {
override def needsV2CheckpointSupport: Boolean = false
}

/**
* Write V2 checkpoints when this policy is enabled.
* This needs [[V2CheckpointTableFeature]] to be enabled on the table.
*/
case object V2 extends Policy("v2")

/** ALl checkpoint policies */
val ALL: Seq[Policy] = Seq(Classic, V2)

/** Converts a `name` String into a [[Policy]] */
def fromName(name: String): Policy = ALL.find(_.name == name).getOrElse {
throw new IllegalStateException(s"Invalid policy $name")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,15 @@ trait DeltaConfigsBase extends DeltaLogging {
"must be Serializable"
)

/** Policy to decide what kind of checkpoint to write to a table. */
val CHECKPOINT_POLICY = buildConfig[CheckpointPolicy.Policy](
key = "checkpointPolicy-dev",
defaultValue = CheckpointPolicy.Classic.name,
fromString = str => CheckpointPolicy.fromName(str),
validationFunction = (v => CheckpointPolicy.ALL.exists(_.name == v.name)),
helpMessage = s"can be one of the " +
s"following: ${CheckpointPolicy.Classic.name}, ${CheckpointPolicy.V2.name}")

/**
* Indicates whether Row Tracking is enabled on the table. When this flag is turned on, all rows
* are guaranteed to have Row IDs and Row Commit Versions assigned to them, and writers are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ object DeltaLogFileIndex {

lazy val COMMIT_FILE_FORMAT = new JsonFileFormat
lazy val CHECKPOINT_FILE_FORMAT_PARQUET = new ParquetFileFormat
lazy val CHECKPOINT_FILE_FORMAT_JSON = new JsonFileFormat

def apply(format: FileFormat, fs: FileSystem, paths: Seq[Path]): DeltaLogFileIndex = {
DeltaLogFileIndex(format, paths.map(fs.getFileStatus).toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ object TableFeature {
ColumnMappingTableFeature,
TimestampNTZTableFeature,
IcebergCompatV1TableFeature,
DeletionVectorsTableFeature)
DeletionVectorsTableFeature,
V2CheckpointTableFeature)
if (DeltaUtils.isTesting) {
features ++= Set(
TestLegacyWriterFeature,
Expand All @@ -344,6 +345,13 @@ object TableFeature {
// Row IDs are still under development and only available in testing.
RowTrackingFeature)
}
val exposeV2Checkpoints =
DeltaUtils.isTesting || SparkSession.getActiveSession.map { spark =>
spark.conf.get(DeltaSQLConf.EXPOSE_CHECKPOINT_V2_TABLE_FEATURE_FOR_TESTING)
}.getOrElse(false)
if (exposeV2Checkpoints) {
features += V2CheckpointTableFeature
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
featureMap
Expand Down Expand Up @@ -513,6 +521,25 @@ object IcebergCompatV1TableFeature extends WriterFeature(name = "icebergCompatV1
}


/**
* V2 Checkpoint table feature is for checkpoints with sidecars and the new format and
* file naming scheme.
* This is still WIP feature.
* This feature will be opensource as soon as work is complete.
*/
object V2CheckpointTableFeature
extends ReaderWriterFeature(name = "v2Checkpoint-under-development")
with FeatureAutomaticallyEnabledByMetadata {

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
DeltaConfigs.CHECKPOINT_POLICY.fromMetaData(metadata).needsV2CheckpointSupport
}
}

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{JsonUtils, Utils => DeltaUtils}
import org.apache.spark.sql.delta.util.FileNames
import com.fasterxml.jackson.annotation._
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind._
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
import com.fasterxml.jackson.databind.node.ObjectNode

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, Encoder, SparkSession}
Expand Down Expand Up @@ -1160,6 +1161,92 @@ object CommitInfo {

}

/** A trait to represent actions which can only be part of Checkpoint */
sealed trait CheckpointOnlyAction extends Action

/**
* An [[Action]] containing the information about a sidecar file.
*
* @param path - sidecar path relative to `_delta_log/_sidecar` directory
* @param sizeInBytes - size in bytes for the sidecar file
* @param modificationTime - modification time of the sidecar file
* @param tags - attributes of the sidecar file, defaults to null (which is semantically same as an
* empty Map). This is kept null to ensure that the field is not present in the
* generated json.
*/
case class SidecarFile(
path: String,
sizeInBytes: Long,
modificationTime: Long,
tags: Map[String, String] = null)
extends Action with CheckpointOnlyAction {

override def wrap: SingleAction = SingleAction(sidecar = this)

def toFileStatus(logPath: Path): FileStatus = {
val partFilePath = new Path(FileNames.sidecarDirPath(logPath), path)
new FileStatus(sizeInBytes, false, 0, 0, modificationTime, partFilePath)
}
}

object SidecarFile {
def apply(fileStatus: SerializableFileStatus): SidecarFile = {
SidecarFile(fileStatus.getHadoopPath.getName, fileStatus.length, fileStatus.modificationTime)
}

def apply(fileStatus: FileStatus): SidecarFile = {
SidecarFile(fileStatus.getPath.getName, fileStatus.getLen, fileStatus.getModificationTime)
}
}

/**
* Holds information about the Delta Checkpoint. This action will only be part of checkpoints.
*
* @param version version of the checkpoint
* @param tags attributes of the checkpoint, defaults to null (which is semantically same as an
* empty Map). This is kept null to ensure that the field is not present in the
* generated json.
*/
case class CheckpointMetadata(
version: Long,
tags: Map[String, String] = null)
extends Action with CheckpointOnlyAction {

import CheckpointMetadata.Tags
override def wrap: SingleAction = SingleAction(checkpointMetadata = this)
}

object CheckpointMetadata {

def apply(
version: Long,
sidecarNumActions: Long,
sidecarSizeInBytes: Long,
numOfAddFiles: Long,
sidecarFileSchemaOpt: Option[StructType]): CheckpointMetadata = {
val tagMapWithSchema = sidecarFileSchemaOpt
.map(schema => Map(Tags.SIDECAR_FILE_SCHEMA.name -> schema.json))
.getOrElse(Map.empty)
CheckpointMetadata(
version = version,
tags = Map(
Tags.SIDECAR_NUM_ACTIONS.name -> sidecarNumActions.toString,
Tags.SIDECAR_SIZE_IN_BYTES.name -> sidecarSizeInBytes.toString,
Tags.NUM_OF_ADD_FILES.name -> numOfAddFiles.toString
) ++ tagMapWithSchema
)
}

object Tags {
sealed abstract class KeyType(val name: String)

object SIDECAR_NUM_ACTIONS extends KeyType("sidecarNumActions")
object SIDECAR_SIZE_IN_BYTES extends KeyType("sidecarSizeInBytes")
object NUM_OF_ADD_FILES extends KeyType("numOfAddFiles")
object SIDECAR_FILE_SCHEMA extends KeyType("sidecarFileSchema")
}
}

/** A serialization helper to create a common action envelope. */
case class SingleAction(
txn: SetTransaction = null,
Expand All @@ -1168,6 +1255,8 @@ case class SingleAction(
metaData: Metadata = null,
protocol: Protocol = null,
cdc: AddCDCFile = null,
checkpointMetadata: CheckpointMetadata = null,
sidecar: SidecarFile = null,
domainMetadata: DomainMetadata = null,
commitInfo: CommitInfo = null) {

Expand All @@ -1184,6 +1273,10 @@ case class SingleAction(
protocol
} else if (cdc != null) {
cdc
} else if (sidecar != null) {
sidecar
} else if (checkpointMetadata != null) {
checkpointMetadata
} else if (domainMetadata != null) {
domainMetadata
} else if (commitInfo != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,35 @@ trait DeltaSQLConfBase {
.checkValue(_ > 0, "partSize has to be positive")
.createOptional

////////////////////////////////////
// Checkpoint V2 Specific Configs
////////////////////////////////////

val CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT =
buildConf("checkpointV2.topLevelFileFormat")
.internal()
.doc(
"""
|The file format to use for the top level checkpoint file in V2 Checkpoints.
| This can be set to either json or parquet. The appropriate format will be
| picked automatically if this config is not specified.
|""".stripMargin)
.stringConf
.checkValues(Set("json", "parquet"))
.createOptional

val EXPOSE_CHECKPOINT_V2_TABLE_FEATURE_FOR_TESTING =
buildConf("checkpointV2.exposeTableFeatureForTesting")
.internal()
.doc(
"""
|This conf controls whether v2 checkpoints table feature is exposed or not. Note that
| v2 checkpoints are in development and this should config should be used only for
| testing/benchmarking.
|""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTA_WRITE_CHECKSUM_ENABLED =
buildConf("writeChecksumFile.enabled")
.doc("Whether the checksum file can be written.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.delta.util

import java.util.UUID

import org.apache.hadoop.fs.{FileStatus, Path}

/** Helper for creating file names for specific commits / checkpoints. */
Expand Down Expand Up @@ -146,4 +148,27 @@ object FileNames {
val DELTA, CHECKPOINT, CHECKSUM, OTHER = Value
}


/** File path for a new V2 Checkpoint Json file */
def newV2CheckpointJsonFile(path: Path, version: Long): Path =
new Path(path, f"$version%020d.checkpoint.${UUID.randomUUID.toString}.json")

/** File path for a new V2 Checkpoint Parquet file */
def newV2CheckpointParquetFile(path: Path, version: Long): Path =
new Path(path, f"$version%020d.checkpoint.${UUID.randomUUID.toString}.parquet")

/** File path for a V2 Checkpoint's Sidecar file */
def newV2CheckpointSidecarFile(
logPath: Path,
version: Long,
numParts: Int,
currentPart: Int): Path = {
val basePath = sidecarDirPath(logPath)
val uuid = UUID.randomUUID.toString
new Path(basePath, f"$version%020d.checkpoint.$currentPart%010d.$numParts%010d.$uuid.parquet")
}

val SIDECAR_SUBDIR = "_sidecars"
/** Returns path to the sidecar directory */
def sidecarDirPath(logPath: Path): Path = new Path(logPath, SIDECAR_SUBDIR)
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,45 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
}
}

test("CheckpointMetadata - serialize/deserialize") {
val m1 = CheckpointMetadata(version = 1, tags = null) // tags are null
val m2 = m1.copy(tags = Map()) // tags are empty
val m3 = m1.copy( // tags are non empty
tags = Map("k1" -> "v1", "schema" -> """{"type":"struct","fields":[]}""")
)

assert(m1.json === """{"checkpointMetadata":{"version":1}}""")
assert(m2.json === """{"checkpointMetadata":{"version":1,"tags":{}}}""")
assert(m3.json ===
"""{"checkpointMetadata":{"version":1,""" +
""""tags":{"k1":"v1","schema":"{\"type\":\"struct\",\"fields\":[]}"}}}""")

Seq(m1, m2, m3).foreach { metadata =>
assert(metadata === JsonUtils.fromJson[SingleAction](metadata.json).unwrap)
}
}

test("SidecarFile - serialize/deserialize") {
val f1 = // tags are null
SidecarFile(path = "/t1/p1", sizeInBytes = 1L, modificationTime = 3, tags = null)
val f2 = f1.copy(tags = Map()) // tags are empty
val f3 = f2.copy( // tags are non empty
tags = Map("k1" -> "v1", "schema" -> """{"type":"struct","fields":[]}""")
)

assert(f1.json ===
"""{"sidecar":{"path":"/t1/p1","sizeInBytes":1,"modificationTime":3}}""")
assert(f2.json ===
"""{"sidecar":{"path":"/t1/p1","sizeInBytes":1,""" +
""""modificationTime":3,"tags":{}}}""")
assert(f3.json ===
"""{"sidecar":{"path":"/t1/p1","sizeInBytes":1,"modificationTime":3,""" +
""""tags":{"k1":"v1","schema":"{\"type\":\"struct\",\"fields\":[]}"}}}""".stripMargin)

Seq(f1, f2, f3).foreach { file =>
assert(file === JsonUtils.fromJson[SingleAction](file.json).unwrap)
}
}

testActionSerDe(
"AddCDCFile (without tags) - json serialization/deserialization",
Expand Down

0 comments on commit 6d6699a

Please sign in to comment.