From 37458a9c54b827c556f68638005dbf829c36d373 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 17 Aug 2023 15:13:21 -0700 Subject: [PATCH] Introduce actions, table feature, properties --- .../apache/spark/sql/delta/Checkpoints.scala | 64 ++++++++++++- .../apache/spark/sql/delta/DeltaConfig.scala | 9 ++ .../spark/sql/delta/DeltaLogFileIndex.scala | 1 + .../apache/spark/sql/delta/TableFeature.scala | 29 +++++- .../spark/sql/delta/actions/actions.scala | 95 ++++++++++++++++++- .../sql/delta/sources/DeltaSQLConf.scala | 29 ++++++ .../spark/sql/delta/util/FileNames.scala | 25 +++++ .../sql/delta/ActionSerializerSuite.scala | 39 ++++++++ 8 files changed, 288 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 700a5a82f2..a21d8b09af 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -458,7 +458,8 @@ object Checkpoints extends DeltaLogging { action } // commitInfo, cdc, remove.tags and remove.stats are not included in the checkpoint - .drop("commitInfo", "cdc") + // TODO: Add support for V2 Checkpoints here. + .drop("commitInfo", "cdc", "checkpointMetadata", "sidecar") .withColumn("remove", col("remove").dropFields("tags", "stats")) val chk = buildCheckpoint(base, snapshot) @@ -684,3 +685,64 @@ object Checkpoints extends DeltaLogging { } else Some(struct(partitionValues: _*).as(STRUCT_PARTITIONS_COL_NAME)) } } + +object V2Checkpoint { + /** Format for V2 Checkpoints */ + sealed abstract class Format(val name: String) { + def fileFormat: FileFormat + } + + def toFormat(fileName: String): Format = fileName match { + case _ if fileName.endsWith(Format.JSON.name) => Format.JSON + case _ if fileName.endsWith(Format.PARQUET.name) => Format.PARQUET + case _ => throw new IllegalStateException(s"Unknown v2 checkpoint file format: ${fileName}") + } + + object Format { + /** json v2 checkpoint */ + object JSON extends Format("json") { + override def fileFormat: FileFormat = DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_JSON + } + + /** parquet v2 checkpoint */ + object PARQUET extends Format("parquet") { + override def fileFormat: FileFormat = DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET + } + + /** All valid formats for the top level file of v2 checkpoints. */ + val ALL: Set[Format] = Set(Format.JSON, Format.PARQUET) + + /** The string representations of all the valid formats. */ + val ALL_AS_STRINGS: Set[String] = ALL.map(_.name) + } +} + +object CheckpointPolicy { + + sealed abstract class Policy(val name: String) { + override def toString: String = name + def needsV2CheckpointSupport: Boolean = true + } + + /** + * Write classic single file/multi-part checkpoints when this policy is enabled. + * Note that [[V2CheckpointTableFeature]] is not required for this checkpoint policy. + */ + case object Classic extends Policy("classic") { + override def needsV2CheckpointSupport: Boolean = false + } + + /** + * Write V2 checkpoints when this policy is enabled. + * This needs [[V2CheckpointTableFeature]] to be enabled on the table. + */ + case object V2 extends Policy("v2") + + /** ALl checkpoint policies */ + val ALL: Seq[Policy] = Seq(Classic, V2) + + /** Converts a `name` String into a [[Policy]] */ + def fromName(name: String): Policy = ALL.find(_.name == name).getOrElse { + throw new IllegalStateException(s"Invalid policy $name") + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index 0f4b991e25..2b9cf38ab3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -624,6 +624,15 @@ trait DeltaConfigsBase extends DeltaLogging { "must be Serializable" ) + /** Policy to decide what kind of checkpoint to write to a table. */ + val CHECKPOINT_POLICY = buildConfig[CheckpointPolicy.Policy]( + key = "checkpointPolicy-dev", + defaultValue = CheckpointPolicy.Classic.name, + fromString = str => CheckpointPolicy.fromName(str), + validationFunction = (v => CheckpointPolicy.ALL.exists(_.name == v.name)), + helpMessage = s"can be one of the " + + s"following: ${CheckpointPolicy.Classic.name}, ${CheckpointPolicy.V2.name}") + /** * Indicates whether Row Tracking is enabled on the table. When this flag is turned on, all rows * are guaranteed to have Row IDs and Row Commit Versions assigned to them, and writers are diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLogFileIndex.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLogFileIndex.scala index 164fe64e86..de111aadd3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLogFileIndex.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLogFileIndex.scala @@ -89,6 +89,7 @@ object DeltaLogFileIndex { lazy val COMMIT_FILE_FORMAT = new JsonFileFormat lazy val CHECKPOINT_FILE_FORMAT_PARQUET = new ParquetFileFormat + lazy val CHECKPOINT_FILE_FORMAT_JSON = new JsonFileFormat def apply(format: FileFormat, fs: FileSystem, paths: Seq[Path]): DeltaLogFileIndex = { DeltaLogFileIndex(format, paths.map(fs.getFileStatus).toArray) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 427be93b6f..61b7fbef34 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -324,7 +324,8 @@ object TableFeature { ColumnMappingTableFeature, TimestampNTZTableFeature, IcebergCompatV1TableFeature, - DeletionVectorsTableFeature) + DeletionVectorsTableFeature, + V2CheckpointTableFeature) if (DeltaUtils.isTesting) { features ++= Set( TestLegacyWriterFeature, @@ -344,6 +345,13 @@ object TableFeature { // Row IDs are still under development and only available in testing. RowTrackingFeature) } + val exposeV2Checkpoints = + DeltaUtils.isTesting || SparkSession.getActiveSession.map { spark => + spark.conf.get(DeltaSQLConf.EXPOSE_CHECKPOINT_V2_TABLE_FEATURE_FOR_TESTING) + }.getOrElse(false) + if (exposeV2Checkpoints) { + features += V2CheckpointTableFeature + } val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap require(features.size == featureMap.size, "Lowercase feature names must not duplicate.") featureMap @@ -513,6 +521,25 @@ object IcebergCompatV1TableFeature extends WriterFeature(name = "icebergCompatV1 } +/** + * V2 Checkpoint table feature is for checkpoints with sidecars and the new format and + * file naming scheme. + * This is still WIP feature. + * This feature will be opensource as soon as work is complete. + */ +object V2CheckpointTableFeature + extends ReaderWriterFeature(name = "v2Checkpoint-under-development") + with FeatureAutomaticallyEnabledByMetadata { + + override def automaticallyUpdateProtocolOfExistingTables: Boolean = true + + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, + spark: SparkSession): Boolean = { + DeltaConfigs.CHECKPOINT_POLICY.fromMetaData(metadata).needsV2CheckpointSupport + } +} + /** * Features below are for testing only, and are being registered to the system only in the testing * environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index ec6ef05b5a..86f2c921e0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.commands.DeletionVectorUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.{JsonUtils, Utils => DeltaUtils} +import org.apache.spark.sql.delta.util.FileNames import com.fasterxml.jackson.annotation._ import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.core.JsonGenerator @@ -37,7 +38,7 @@ import com.fasterxml.jackson.databind._ import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize} import com.fasterxml.jackson.databind.node.ObjectNode -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, Encoder, SparkSession} @@ -1160,6 +1161,92 @@ object CommitInfo { } +/** A trait to represent actions which can only be part of Checkpoint */ +sealed trait CheckpointOnlyAction extends Action + +/** + * An [[Action]] containing the information about a sidecar file. + * + * @param path - sidecar path relative to `_delta_log/_sidecar` directory + * @param sizeInBytes - size in bytes for the sidecar file + * @param modificationTime - modification time of the sidecar file + * @param tags - attributes of the sidecar file, defaults to null (which is semantically same as an + * empty Map). This is kept null to ensure that the field is not present in the + * generated json. + */ +case class SidecarFile( + path: String, + sizeInBytes: Long, + modificationTime: Long, + tags: Map[String, String] = null) + extends Action with CheckpointOnlyAction { + + override def wrap: SingleAction = SingleAction(sidecar = this) + + def toFileStatus(logPath: Path): FileStatus = { + val partFilePath = new Path(FileNames.sidecarDirPath(logPath), path) + new FileStatus(sizeInBytes, false, 0, 0, modificationTime, partFilePath) + } +} + +object SidecarFile { + def apply(fileStatus: SerializableFileStatus): SidecarFile = { + SidecarFile(fileStatus.getHadoopPath.getName, fileStatus.length, fileStatus.modificationTime) + } + + def apply(fileStatus: FileStatus): SidecarFile = { + SidecarFile(fileStatus.getPath.getName, fileStatus.getLen, fileStatus.getModificationTime) + } +} + +/** + * Holds information about the Delta Checkpoint. This action will only be part of checkpoints. + * + * @param version version of the checkpoint + * @param tags attributes of the checkpoint, defaults to null (which is semantically same as an + * empty Map). This is kept null to ensure that the field is not present in the + * generated json. + */ +case class CheckpointMetadata( + version: Long, + tags: Map[String, String] = null) + extends Action with CheckpointOnlyAction { + + import CheckpointMetadata.Tags + override def wrap: SingleAction = SingleAction(checkpointMetadata = this) +} + +object CheckpointMetadata { + + def apply( + version: Long, + sidecarNumActions: Long, + sidecarSizeInBytes: Long, + numOfAddFiles: Long, + sidecarFileSchemaOpt: Option[StructType]): CheckpointMetadata = { + val tagMapWithSchema = sidecarFileSchemaOpt + .map(schema => Map(Tags.SIDECAR_FILE_SCHEMA.name -> schema.json)) + .getOrElse(Map.empty) + CheckpointMetadata( + version = version, + tags = Map( + Tags.SIDECAR_NUM_ACTIONS.name -> sidecarNumActions.toString, + Tags.SIDECAR_SIZE_IN_BYTES.name -> sidecarSizeInBytes.toString, + Tags.NUM_OF_ADD_FILES.name -> numOfAddFiles.toString + ) ++ tagMapWithSchema + ) + } + + object Tags { + sealed abstract class KeyType(val name: String) + + object SIDECAR_NUM_ACTIONS extends KeyType("sidecarNumActions") + object SIDECAR_SIZE_IN_BYTES extends KeyType("sidecarSizeInBytes") + object NUM_OF_ADD_FILES extends KeyType("numOfAddFiles") + object SIDECAR_FILE_SCHEMA extends KeyType("sidecarFileSchema") + } +} + /** A serialization helper to create a common action envelope. */ case class SingleAction( txn: SetTransaction = null, @@ -1168,6 +1255,8 @@ case class SingleAction( metaData: Metadata = null, protocol: Protocol = null, cdc: AddCDCFile = null, + checkpointMetadata: CheckpointMetadata = null, + sidecar: SidecarFile = null, domainMetadata: DomainMetadata = null, commitInfo: CommitInfo = null) { @@ -1184,6 +1273,10 @@ case class SingleAction( protocol } else if (cdc != null) { cdc + } else if (sidecar != null) { + sidecar + } else if (checkpointMetadata != null) { + checkpointMetadata } else if (domainMetadata != null) { domainMetadata } else if (commitInfo != null) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index b5273f6954..20662e5e57 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -572,6 +572,35 @@ trait DeltaSQLConfBase { .checkValue(_ > 0, "partSize has to be positive") .createOptional + //////////////////////////////////// + // Checkpoint V2 Specific Configs + //////////////////////////////////// + + val CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT = + buildConf("checkpointV2.topLevelFileFormat") + .internal() + .doc( + """ + |The file format to use for the top level checkpoint file in V2 Checkpoints. + | This can be set to either json or parquet. The appropriate format will be + | picked automatically if this config is not specified. + |""".stripMargin) + .stringConf + .checkValues(Set("json", "parquet")) + .createOptional + + val EXPOSE_CHECKPOINT_V2_TABLE_FEATURE_FOR_TESTING = + buildConf("checkpointV2.exposeTableFeatureForTesting") + .internal() + .doc( + """ + |This conf controls whether v2 checkpoints table feature is exposed or not. Note that + | v2 checkpoints are in development and this should config should be used only for + | testing/benchmarking. + |""".stripMargin) + .booleanConf + .createWithDefault(false) + val DELTA_WRITE_CHECKSUM_ENABLED = buildConf("writeChecksumFile.enabled") .doc("Whether the checksum file can be written.") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala index a01e844b44..f2ec30f7d4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.util +import java.util.UUID + import org.apache.hadoop.fs.{FileStatus, Path} /** Helper for creating file names for specific commits / checkpoints. */ @@ -146,4 +148,27 @@ object FileNames { val DELTA, CHECKPOINT, CHECKSUM, OTHER = Value } + + /** File path for a new V2 Checkpoint Json file */ + def newV2CheckpointJsonFile(path: Path, version: Long): Path = + new Path(path, f"$version%020d.checkpoint.${UUID.randomUUID.toString}.json") + + /** File path for a new V2 Checkpoint Parquet file */ + def newV2CheckpointParquetFile(path: Path, version: Long): Path = + new Path(path, f"$version%020d.checkpoint.${UUID.randomUUID.toString}.parquet") + + /** File path for a V2 Checkpoint's Sidecar file */ + def newV2CheckpointSidecarFile( + logPath: Path, + version: Long, + numParts: Int, + currentPart: Int): Path = { + val basePath = sidecarDirPath(logPath) + val uuid = UUID.randomUUID.toString + new Path(basePath, f"$version%020d.checkpoint.$currentPart%010d.$numParts%010d.$uuid.parquet") + } + + val SIDECAR_SUBDIR = "_sidecars" + /** Returns path to the sidecar directory */ + def sidecarDirPath(logPath: Path): Path = new Path(logPath, SIDECAR_SUBDIR) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala index b13180ef8d..dd7ad12e7f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala @@ -382,6 +382,45 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta } } + test("CheckpointMetadata - serialize/deserialize") { + val m1 = CheckpointMetadata(version = 1, tags = null) // tags are null + val m2 = m1.copy(tags = Map()) // tags are empty + val m3 = m1.copy( // tags are non empty + tags = Map("k1" -> "v1", "schema" -> """{"type":"struct","fields":[]}""") + ) + + assert(m1.json === """{"checkpointMetadata":{"version":1}}""") + assert(m2.json === """{"checkpointMetadata":{"version":1,"tags":{}}}""") + assert(m3.json === + """{"checkpointMetadata":{"version":1,""" + + """"tags":{"k1":"v1","schema":"{\"type\":\"struct\",\"fields\":[]}"}}}""") + + Seq(m1, m2, m3).foreach { metadata => + assert(metadata === JsonUtils.fromJson[SingleAction](metadata.json).unwrap) + } + } + + test("SidecarFile - serialize/deserialize") { + val f1 = // tags are null + SidecarFile(path = "/t1/p1", sizeInBytes = 1L, modificationTime = 3, tags = null) + val f2 = f1.copy(tags = Map()) // tags are empty + val f3 = f2.copy( // tags are non empty + tags = Map("k1" -> "v1", "schema" -> """{"type":"struct","fields":[]}""") + ) + + assert(f1.json === + """{"sidecar":{"path":"/t1/p1","sizeInBytes":1,"modificationTime":3}}""") + assert(f2.json === + """{"sidecar":{"path":"/t1/p1","sizeInBytes":1,""" + + """"modificationTime":3,"tags":{}}}""") + assert(f3.json === + """{"sidecar":{"path":"/t1/p1","sizeInBytes":1,"modificationTime":3,""" + + """"tags":{"k1":"v1","schema":"{\"type\":\"struct\",\"fields\":[]}"}}}""".stripMargin) + + Seq(f1, f2, f3).foreach { file => + assert(file === JsonUtils.fromJson[SingleAction](file.json).unwrap) + } + } testActionSerDe( "AddCDCFile (without tags) - json serialization/deserialization",