From cea116b05a94ef74f7e7f81fdf96251036691c40 Mon Sep 17 00:00:00 2001 From: micheal-o Date: Wed, 30 Apr 2025 22:54:11 -0700 Subject: [PATCH 1/5] file checksum --- .../resources/error/error-conditions.json | 7 + .../org/apache/spark/internal/LogKey.scala | 1 + .../sql/errors/QueryExecutionErrors.scala | 17 + .../apache/spark/sql/internal/SQLConf.scala | 11 + .../streaming/CheckpointFileManager.scala | 2 + .../ChecksumCheckpointFileManager.scala | 510 ++++++++++++++++++ .../state/HDFSBackedStateStoreProvider.scala | 52 +- .../execution/streaming/state/RocksDB.scala | 18 +- .../streaming/state/RocksDBFileManager.scala | 53 +- .../streaming/state/StateStore.scala | 51 +- .../streaming/state/StateStoreChangelog.scala | 11 +- .../streaming/state/StateStoreConf.scala | 3 + .../CheckpointFileManagerSuite.scala | 5 +- .../ChecksumCheckpointFileManagerSuite.scala | 207 +++++++ ...ailureInjectionCheckpointFileManager.scala | 4 +- .../state/RocksDBStateStoreSuite.scala | 8 + .../streaming/state/StateStoreSuite.scala | 219 +++++++- 17 files changed, 1153 insertions(+), 26 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3d7977673a3fc..5d19bbc7939db 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -536,6 +536,13 @@ ], "sqlState" : "42P08" }, + "CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED" : { + "message" : [ + "Checksum verification failed, the file may be corrupted. File: ", + "Expected (file size: , checksum: ), Computed (file size: , checksum: )." + ], + "sqlState" : "XX000" + }, "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" : { "message" : [ "Checkpoint block not found!", diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 1f997592dbfb7..d0792c0e99884 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -126,6 +126,7 @@ private[spark] object LogKeys { case object CHECKPOINT_PATH extends LogKey case object CHECKPOINT_ROOT extends LogKey case object CHECKPOINT_TIME extends LogKey + case object CHECKSUM extends LogKey case object CHOSEN_WATERMARK extends LogKey case object CLASSIFIER extends LogKey case object CLASS_LOADER extends LogKey diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index d0ece5baff435..11abaf3979985 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2622,6 +2622,23 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE cause = null) } + def checkpointFileChecksumVerificationFailed( + file: Path, + expectedSize: Long, + expectedChecksum: String, + computedSize: Long, + computedChecksum: String): Throwable = { + new SparkException( + errorClass = "CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED", + messageParameters = Map( + "fileName" -> file.toString, + "expectedSize" -> expectedSize.toString, + "expectedChecksum" -> expectedChecksum, + "computedSize" -> computedSize.toString, + "computedChecksum" -> computedChecksum), + cause = null) + } + def cannotReadCheckpoint(expectedVersion: String, actualVersion: String): Throwable = { new SparkException ( errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b27ce8eeaec33..c715daeec5975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3126,6 +3126,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED = + buildConf("spark.sql.streaming.checkpoint.fileChecksum.enabled") + .internal() + .doc("When true, checksum would be generated and verified for some checkpoint files. " + + "This is used to detect file corruption.") + .version("4.1.0") + .booleanConf + .createWithDefault(true) + val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION = buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled") .internal() @@ -6044,6 +6053,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION) + def checkpointFileChecksumEnabled: Boolean = getConf(STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED) + def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) def useDeprecatedKafkaOffsetFetching: Boolean = getConf(USE_DEPRECATED_KAFKA_OFFSET_FETCHING) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 982cc13c40868..259fe60c510c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -91,6 +91,8 @@ trait CheckpointFileManager { * checkpoint path. */ def createCheckpointDirectory(): Path + + def close(): Unit = {} } object CheckpointFileManager extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala new file mode 100644 index 0000000000000..9d3b808aa48c2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileNotFoundException, InputStream} +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit +import java.util.zip.{CheckedInputStream, CheckedOutputStream, CRC32C} + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.io.Source + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} +import org.apache.hadoop.fs._ +import org.json4s.{Formats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.SparkException +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{CHECKSUM, NUM_BYTES, PATH, TIMEOUT} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream +import org.apache.spark.util.ThreadUtils + +/** Information about the creator of the checksum file. Useful for debugging */ +case class ChecksumFileCreatorInfo( + executorId: String, + taskInfo: String) + +object ChecksumFileCreatorInfo { + def apply(): ChecksumFileCreatorInfo = { + val executorId = Option(SparkEnv.get).map(_.executorId).getOrElse("") + val taskInfo = Option(TaskContext.get()).map(tc => + s"Task= ${tc.partitionId()}.${tc.attemptNumber()}, " + + s"Stage= ${tc.stageId()}.${tc.stageAttemptNumber()}").getOrElse("") + new ChecksumFileCreatorInfo(executorId, taskInfo) + } +} + +/** This is the content of the checksum file. + * Holds the checksum value and additional information */ +case class Checksum( + algorithm: String, + // Making this a string to be agnostic of algorithm used + value: String, + mainFileSize: Long, + timestampMs: Long, + creator: ChecksumFileCreatorInfo) { + + import Checksum._ + + def json(): String = { + mapper.writeValueAsString(this) + } +} + +object Checksum { + implicit val format: Formats = Serialization.formats(NoTypeHints) + + /** Used to convert between class and JSON. */ + lazy val mapper = { + val _mapper = new ObjectMapper with ClassTagExtensions + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } + + def fromJson(json: String): Checksum = { + Serialization.read[Checksum](json) + } +} + +/** Holds the path of the checksum file and knows the main file path. */ +case class ChecksumFile(path: Path) { + import ChecksumCheckpointFileManager._ + assert(isChecksumFile(path), "path is not a checksum file") + + val mainFilePath = new Path(path.toString.stripSuffix(CHECKSUM_FILE_SUFFIX)) + + /** The name of the file without any extensions e.g. my-file.txt.crc returns my-file */ + val baseName: String = path.getName.split("\\.").head +} + +/** + * A [[CheckpointFileManager]] that creates a checksum file for the main file. + * This wraps another [[CheckpointFileManager]] and adds checksum functionality on top of it. + * Under the hood, when a file is created, it also creates a checksum file with the same name as + * the main file but adds a suffix. It returns [[ChecksumCancellableFSDataOutputStream]] + * which handles the writing of the main file and checksum file. + * + * When a file is opened, it returns [[ChecksumFSDataInputStream]], which handles reading + * the main file and checksum file and does the checksum verification. + * + * In order to reduce the impact of reading/writing 2 files instead of 1, it uses a threadpool + * to read/write both files concurrently. + * + * @note + * It is able to read files written by other [[CheckpointFileManager]], that don't have checksum. + * It automatically deletes the checksum file when the main file is deleted. + * If you delete the main file with a different type of manager, then the checksum file will be + * left behind (i.e. orphan checksum file), since they don't know about it. It would be your + * responsibility to delete the orphan checksum files. + * + * @param underlyingFileMgr The file manager to use under the hood + * @param allowConcurrentDelete If true, allows deleting the main and checksum file concurrently. + * This is a perf optimization, but can potentially lead to + * orphan checksum files. If using this, it is your responsibility + * to clean up the potential orphan checksum files. + * @param numThreads This is the number of threads to use for the thread pool, for reading/writing + * files. To avoid blocking, if the file manager instance is being used by a + * single thread, then you can set this to 2 (one thread for main file, another + * for checksum file). + * If file manager is shared by multiple threads, you can set it to + * number of threads using file manager * 2. + * Setting this differently can lead to file operation being blocked waiting for + * a free thread. + */ +class ChecksumCheckpointFileManager( + private val underlyingFileMgr: CheckpointFileManager, + val allowConcurrentDelete: Boolean = false, + val numThreads: Int) + extends CheckpointFileManager with Logging { + assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 for the main file" + + "and another for the checksum file") + + import ChecksumCheckpointFileManager._ + + // This allows us to concurrently read/write the main file and checksum file + private val threadPool = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread")) + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = { + underlyingFileMgr.list(path, filter) + } + + override def mkdirs(path: Path): Unit = { + underlyingFileMgr.mkdirs(path) + } + + override def createAtomic(path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + assert(!isChecksumFile(path), "Cannot directly create a checksum file") + + val mainFileFuture = Future { + underlyingFileMgr.createAtomic(path, overwriteIfPossible) + }(threadPool) + + val checksumFileFuture = Future { + underlyingFileMgr.createAtomic(getChecksumPath(path), overwriteIfPossible) + }(threadPool) + + new ChecksumCancellableFSDataOutputStream( + awaitResult(mainFileFuture, Duration.Inf), + path, + awaitResult(checksumFileFuture, Duration.Inf), + threadPool + ) + } + + override def open(path: Path): FSDataInputStream = { + assert(!isChecksumFile(path), "Cannot directly open a checksum file") + + val checksumInputStreamFuture = Future { + try { + Some(underlyingFileMgr.open(getChecksumPath(path))) + } catch { + // In case the client previously had file checksum disabled. + // Then previously created files won't have checksum. + case _: FileNotFoundException => + logWarning(log"No checksum file found for ${MDC(PATH, path)}, " + + log"hence no checksum verification.") + None + } + }(threadPool) + + val mainInputStreamFuture = Future { + underlyingFileMgr.open(path) + }(threadPool) + + val mainStream = awaitResult(mainInputStreamFuture, Duration.Inf) + val checksumStream = awaitResult(checksumInputStreamFuture, Duration.Inf) + + checksumStream.map { chkStream => + new ChecksumFSDataInputStream(mainStream, path, chkStream, threadPool) + }.getOrElse(mainStream) + } + + override def exists(path: Path): Boolean = underlyingFileMgr.exists(path) + + override def delete(path: Path): Unit = { + // Allowing directly deleting the checksum file for orphan checksum file scenario + if (isChecksumFile(path)) { + deleteChecksumFile(path) + } else if (allowConcurrentDelete) { + // Ideally, we should first try to delete the checksum file + // before the main file, to avoid a situation where the main file is deleted but the + // checksum file deletion failed. The client might not call delete again if the main file + // no longer exists. + // But if allowConcurrentDelete is enabled, then we can do it concurrently for perf. + // But the client would be responsible for cleaning up potential orphan checksum files + // if it happens. + val checksumInputStreamFuture = Future { + deleteChecksumFile(getChecksumPath(path)) + }(threadPool) + + val mainInputStreamFuture = Future { + underlyingFileMgr.delete(path) + }(threadPool) + + awaitResult(mainInputStreamFuture, Duration.Inf) + awaitResult(checksumInputStreamFuture, Duration.Inf) + } else { + // First delete the checksum file, then main file + deleteChecksumFile(getChecksumPath(path)) + underlyingFileMgr.delete(path) + } + } + + private def deleteChecksumFile(checksumPath: Path): Unit = { + try { + underlyingFileMgr.delete(checksumPath) + logDebug(log"Deleted checksum file ${MDC(PATH, checksumPath)}") + } catch { + case _: FileNotFoundException => + // Ignore if file has already been deleted + // or the main file was created initially without checksum + logWarning(log"Skipping deletion of checksum file ${MDC(PATH, checksumPath)} " + + log"since it does not exist.") + } + } + + override def isLocal: Boolean = underlyingFileMgr.isLocal + + override def createCheckpointDirectory(): Path = { + underlyingFileMgr.createCheckpointDirectory() + } + + override def close(): Unit = { + threadPool.shutdown() + // Wait a bit for it to finish up in case there is any ongoing work + // Can consider making this timeout configurable, if needed + val timeoutMs = 100 + if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { + logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, timeoutMs)} ms") + } + } +} + +private[streaming] object ChecksumCheckpointFileManager { + val CHECKSUM_FILE_SUFFIX = ".crc" + + def awaitResult[T](future: Future[T], atMost: Duration): T = { + try { + ThreadUtils.awaitResult(future, atMost) + } catch { + // awaitResult wraps the exception. Unwrap it, and throw the actual error + case e: SparkException if e.getMessage.contains("Exception thrown in awaitResult") => + throw e.getCause + } + } + + def getChecksumPath(mainFilePath: Path): Path = { + new Path(mainFilePath.toString + CHECKSUM_FILE_SUFFIX) + } + + def isChecksumFile(path: Path): Boolean = { + path.getName.endsWith(CHECKSUM_FILE_SUFFIX) + } +} + +/** An implementation of [[FSDataInputStream]] that calculates the checksum of the file + * that the client is reading (main file) incrementally, while it is being read. + * It then does checksum verification on close, to verify that the computed checksum + * matches the expected checksum in the checksum file. + * + * Computing the checksum incrementally and doing the verification after file read is complete + * is for better performance, instead of first reading the entire file and doing verification + * before the client starts reading the file. + * + * @param mainStream Input stream for the main file the client wants to read + * @param path The path of the main file + * @param expectedChecksumStream The input stream for the checksum file + * @param threadPool Thread pool to use for concurrently operating on the main and checksum file + * */ +class ChecksumFSDataInputStream( + private val mainStream: FSDataInputStream, + path: Path, + private val expectedChecksumStream: FSDataInputStream, + private val threadPool: ExecutionContext) + extends FSDataInputStream(new CheckedSequentialInputStream(mainStream)) with Logging { + + import ChecksumCheckpointFileManager._ + + @volatile private var verified = false + @volatile private var closed = false + + override def close(): Unit = { + if (!closed) { + // We verify the checksum only when the client is done reading. + verifyChecksum() + closeInternal() + } + } + + /** This is used to skip checksum verification on close. + * Avoid using this, and it is only used for a situation where the file is opened, read, + * then closed multiple times, and we want to avoid doing verification each time + * and only want to do it once. + * */ + def closeWithoutChecksumVerification(): Unit = { + if (!closed) { + logWarning(log"Closing file ${MDC(PATH, path)} without checksum verification") + closeInternal() + } + } + + private def closeInternal(): Unit = { + closed = true + + val mainCloseFuture = Future { + super.close() // close the main file + }(threadPool) + + val checksumCloseFuture = Future { + expectedChecksumStream.close() // close the checksum file + }(threadPool) + + awaitResult(mainCloseFuture, Duration.Inf) + awaitResult(checksumCloseFuture, Duration.Inf) + } + + private def verifyChecksum(): Unit = { + if (!verified) { + // It is possible the file was not read till the end by the reader, + // but we need the entire file content for checksum verification. + // Hence, we will read the file till the end from where reader stopped. + var remainingBytesRead = 0 + val buffer = new Array[Byte](1024) + var bytesRead = 0 + while ({ bytesRead = super.read(buffer); bytesRead != -1 }) { + remainingBytesRead += bytesRead + } + + // we are at the end position so that tells us the size + val computedFileSize = mainStream.getPos + + if (remainingBytesRead > 0) { + // Making this debug log since most of our files are not read exactly to the end + // and don't want to cause unnecessary noise in the logs. + logDebug(log"File ${MDC(PATH, path)} was not read till the end by reader. " + + log"Finished reading the rest of the file for checksum verification. " + + log"Remaining bytes read: ${MDC(NUM_BYTES, remainingBytesRead)}, " + + log"total size: ${MDC(NUM_BYTES, computedFileSize)}.") + } + + // Read the expected checksum from the checksum file + val expectedChecksumJson = Source.fromInputStream( + expectedChecksumStream, StandardCharsets.UTF_8.name()).mkString + val expectedChecksum = Checksum.fromJson(expectedChecksumJson) + // Get what we computed while the main file was being read locally + // `in` is the CheckedSequentialInputStream we created + val computedChecksumValue = in.asInstanceOf[CheckedInputStream].getChecksum.getValue.toInt + + logInfo(log"Verifying checksum for file ${MDC(PATH, path)}, " + + log"remainingBytesRead= ${MDC(NUM_BYTES, remainingBytesRead)}. " + + log"Computed(checksum= ${MDC(CHECKSUM, computedChecksumValue)}, " + + log"fileSize= ${MDC(NUM_BYTES, computedFileSize)})." + + log"Checksum file content: ${MDC(CHECKSUM, expectedChecksumJson)}") + + verified = true + + // Compare file size too, in case of collision + if (expectedChecksum.value.toInt != computedChecksumValue || + expectedChecksum.mainFileSize != computedFileSize) { + throw QueryExecutionErrors.checkpointFileChecksumVerificationFailed( + path, + expectedSize = expectedChecksum.mainFileSize, + expectedChecksum.value, + computedSize = computedFileSize, + computedChecksumValue.toString) + } + } + } +} + +/** This implements [[CheckedInputStream]] that allows us to compute the checksum as the file + * is being read. [[FSDataInputStream]] needs the passed in input stream to implement Seekable + * and PositionedReadable. We want the file to be read sequentially only to allow us to correctly + * compute the checksum. This is blocking the seekable apis from being used in the underlying stream + * */ +private class CheckedSequentialInputStream(data: InputStream) + // TODO: Make the checksum algo configurable + extends CheckedInputStream(data, new CRC32C()) + with Seekable + with PositionedReadable { + + // Seekable methods + override def seek(pos: Long): Unit = { + throw new UnsupportedOperationException("Seek not supported") + } + + override def getPos: Long = { + throw new UnsupportedOperationException("getPos not supported") + } + + override def seekToNewSource(targetPos: Long): Boolean = { + throw new UnsupportedOperationException("seekToNewSource not supported") + } + + // PositionedReadable methods + def read(position: Long, buffer: Array[Byte], offset: Int, length: Int): Int = { + throw new UnsupportedOperationException("read from position not supported") + } + + def readFully(position: Long, buffer: Array[Byte], offset: Int, length: Int): Unit = { + throw new UnsupportedOperationException("readFully from position not supported") + } + + def readFully(position: Long, buffer: Array[Byte]): Unit = { + throw new UnsupportedOperationException("readFully from position not supported") + } +} + +/** An implementation of [[CancellableFSDataOutputStream]] that calculates the checksum of the file + * that the client is writing (main file) incrementally, while it is being written. + * It then writes the main file and an additional checksum file, which will be used for verification + * by [[ChecksumFSDataInputStream]] on file read. + * + * @param mainStream Output stream for the main file the client wants to write to + * @param path The path of the main file + * @param checksumStream Output stream for the checksum file to write the computed checksum + * @param uploadThreadPool Thread pool used to concurrently upload the main and checksum file + * */ +class ChecksumCancellableFSDataOutputStream( + private val mainStream: CancellableFSDataOutputStream, + path: Path, + private val checksumStream: CancellableFSDataOutputStream, + private val uploadThreadPool: ExecutionContext) + // TODO: make the checksum algo configurable + // CheckedOutputStream creates the checksum value as we write to the stream + extends CancellableFSDataOutputStream(new CheckedOutputStream(mainStream, new CRC32C())) + with Logging { + + import ChecksumCheckpointFileManager._ + + @volatile private var closed = false + + override def cancel(): Unit = { + val mainFuture = Future { + mainStream.cancel() + }(uploadThreadPool) + + val checksumFuture = Future { + checksumStream.cancel() + }(uploadThreadPool) + + awaitResult(mainFuture, Duration.Inf) + awaitResult(checksumFuture, Duration.Inf) + } + + override def close(): Unit = { + if (!closed) { + closed = true + + // Create this within the caller thread + val creator = ChecksumFileCreatorInfo() + val chkValue = underlyingStream.asInstanceOf[CheckedOutputStream].getChecksum.getValue.toInt + val mainFileSize = mainStream.getPos + + val mainFuture = Future { + super.close() // close the main file + }(uploadThreadPool) + + val checksumFuture = Future { + val json = Checksum( + algorithm = "CRC32C", + value = chkValue.toString, + mainFileSize = mainFileSize, + timestampMs = System.currentTimeMillis(), + creator = creator).json() + + logInfo(log"Created checksum for file ${MDC(PATH, path)}: ${MDC(CHECKSUM, json)}") + checksumStream.write(json.getBytes(StandardCharsets.UTF_8)) + checksumStream.close() + }(uploadThreadPool) + + awaitResult(mainFuture, Duration.Inf) + awaitResult(checksumFuture, Duration.Inf) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 98d49596d11b4..330b578d9e91c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext} import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, ChecksumCheckpointFileManager, ChecksumFile} import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream import org.apache.spark.sql.types.StructType import org.apache.spark.util.{SizeEstimator, Utils} @@ -389,6 +389,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with // `loadedMaps` will be de-referenced and GCed automatically when their reference // counts become 0. synchronized { loadedMaps.clear() } + fm.close() } override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = { @@ -426,7 +427,23 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private lazy val loadedMaps = new util.TreeMap[Long, HDFSBackedStateStoreMap]( Ordering[Long].reverse) private lazy val baseDir = stateStoreId.storeCheckpointLocation() - private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) + private lazy val fm = { + val mgr = CheckpointFileManager.create(baseDir, hadoopConf) + if (storeConf.checkpointFileChecksumEnabled) { + new ChecksumCheckpointFileManager( + mgr, + // Allowing this for perf, since we do orphan checksum file cleanup in maintenance anyway + allowConcurrentDelete = true, + // We need 2 threads per fm caller to avoid blocking + // (one for main file and another for checksum file). + // Since this fm is used by both query task and maintenance thread, + // then we need 2 * 2 = 4 threads. + numThreads = 4) + } else { + mgr + } + } + private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) private val loadedMapCacheHitCount: LongAdder = new LongAdder @@ -468,7 +485,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with * Note that this will look up the files to determined the latest known version. */ private[state] def latestIterator(): Iterator[UnsafeRowPair] = synchronized { - val versionsInFiles = fetchFiles().map(_.version).toSet + val storeFiles = fetchFiles()._1 + val versionsInFiles = storeFiles.map(_.version).toSet val versionsLoaded = loadedMaps.keySet.asScala val allKnownVersions = versionsInFiles ++ versionsLoaded if (allKnownVersions.nonEmpty) { @@ -804,7 +822,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with /** Perform a snapshot of the store to allow delta files to be consolidated */ private def doSnapshot(opType: String): Unit = { try { - val (files, e1) = Utils.timeTakenMs(fetchFiles()) + val ((files, _), e1) = Utils.timeTakenMs(fetchFiles()) logDebug(s"fetchFiles() took $e1 ms.") if (files.nonEmpty) { @@ -834,7 +852,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with */ private[state] def cleanup(): Unit = { try { - val (files, e1) = Utils.timeTakenMs(fetchFiles()) + val ((files, checksumFiles), e1) = Utils.timeTakenMs(fetchFiles()) logDebug(s"fetchFiles() took $e1 ms.") if (files.nonEmpty) { @@ -852,6 +870,19 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with log"${MDC(LogKeys.FILE_VERSION, earliestFileToRetain.version)} for " + log"${MDC(LogKeys.STATE_STORE_PROVIDER, this)}: " + log"${MDC(LogKeys.FILE_NAME, filesToDelete.mkString(", "))}") + + // Do this only if we see checksum files in the initial dir listing + // To avoid checking for orphan checksum files, if there is no checksum files + // (e.g. file checksum was never enabled) + if (checksumFiles.nonEmpty) { + StateStoreProvider.deleteOrphanChecksumFiles( + fm, + checksumFiles, + filesToDelete.map(_.path), + earliestFileToRetain.version, + storeConf.checkpointFileChecksumEnabled + ) + } } } } catch { @@ -888,7 +919,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } /** Fetch all the files that back the store */ - private def fetchFiles(): Seq[StoreFile] = { + private def fetchFiles(): (Seq[StoreFile], Seq[ChecksumFile]) = { val files: Seq[FileStatus] = try { fm.list(baseDir).toImmutableArraySeq } catch { @@ -915,8 +946,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } } val storeFiles = versionToFiles.values.toSeq.sortBy(_.version) - logDebug(s"Current set of files for $this: ${storeFiles.mkString(", ")}") - storeFiles + val checksumFiles = files + .filter(f => ChecksumCheckpointFileManager.isChecksumFile(f.getPath)) + .map(f => ChecksumFile(f.getPath)) + logDebug(s"Current set of files for $this: ${storeFiles.mkString(", ")}, " + + s"checksum files: ${checksumFiles.mkString(", ")}") + + (storeFiles, checksumFiles) } private def compressStream(outputStream: DataOutputStream): DataOutputStream = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 07553f51c60e1..7ee8a06d34718 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -138,6 +138,12 @@ class RocksDB( private val workingDir = createTempDir("workingDir") + // We need 2 threads per fm caller to avoid blocking + // (one for main file and another for checksum file). + // Since this fm is used by both query task and maintenance thread, + // then we need 2 * 2 = 4 threads. + protected val fileChecksumThreadPoolSize: Option[Int] = Some(4) + protected def createFileManager( dfsRootDir: String, localTempDir: File, @@ -149,7 +155,9 @@ class RocksDB( localTempDir, hadoopConf, codecName, - loggingId = loggingId + loggingId = loggingId, + fileChecksumEnabled = conf.fileChecksumEnabled, + fileChecksumThreadPoolSize = fileChecksumThreadPoolSize ) } @@ -1247,6 +1255,8 @@ class RocksDB( silentDeleteRecursively(localRootDir, "closing RocksDB") // Clear internal maps to reset the state clearColFamilyMaps() + + fileManager.close() } catch { case e: Exception => logWarning("Error closing RocksDB", e) @@ -1791,7 +1801,8 @@ case class RocksDBConf( compressionCodec: String, allowFAllocate: Boolean, compression: String, - reportSnapshotUploadLag: Boolean) + reportSnapshotUploadLag: Boolean, + fileChecksumEnabled: Boolean) object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ @@ -1975,7 +1986,8 @@ object RocksDBConf { storeConf.compressionCodec, getBooleanConf(ALLOW_FALLOCATE_CONF), getStringConf(COMPRESSION_CONF), - storeConf.reportSnapshotUploadLag) + storeConf.reportSnapshotUploadLag, + storeConf.checkpointFileChecksumEnabled) } def apply(): RocksDBConf = apply(new StateStoreConf()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 562a57aafbd41..8e75dbb06a475 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -40,7 +40,7 @@ import org.apache.spark.{SparkConf, SparkEnv, SparkException} import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext} import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, ChecksumCheckpointFileManager, ChecksumFile} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -122,13 +122,18 @@ import org.apache.spark.util.Utils * @param localTempDir Local directory for temporary work * @param hadoopConf Hadoop configuration for talking to DFS * @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs + * @param fileChecksumEnabled Whether file checksum generation and verification is enabled + * @param fileChecksumThreadPoolSize Number of threads used to concurrently operate on the + * main and checksum files */ class RocksDBFileManager( dfsRootDir: String, localTempDir: File, hadoopConf: Configuration, codecName: String = CompressionCodec.ZSTD, - loggingId: String = "") + loggingId: String = "", + fileChecksumEnabled: Boolean = false, + fileChecksumThreadPoolSize: Option[Int] = None) extends Logging { import RocksDBImmutableFile._ @@ -137,7 +142,19 @@ class RocksDBFileManager( new Path(myDfsRootDir).getFileSystem(myHadoopConf) } - private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) + private lazy val fm = { + val mgr = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) + if (fileChecksumEnabled) { + new ChecksumCheckpointFileManager( + mgr, + // Allowing this for perf, since we do orphan checksum file cleanup in maintenance anyway + allowConcurrentDelete = true, + numThreads = fileChecksumThreadPoolSize.get) + } else { + mgr + } + } + private val fs = getFileSystem(dfsRootDir, hadoopConf) private val onlyZipFiles = new PathFilter { override def accept(path: Path): Boolean = path.toString.endsWith(".zip") @@ -161,6 +178,10 @@ class RocksDBFileManager( private val versionToRocksDBFiles = new ConcurrentHashMap[(Long, Option[String]), Seq[RocksDBImmutableFile]]() + def close(): Unit = { + fm.close() + } + /** * Get the changelog version based on rocksDB features. * @return the version of changelog @@ -342,6 +363,8 @@ class RocksDBFileManager( } else { // Delete all non-immutable files in local dir, and unzip new ones from DFS commit file listRocksDBFiles(localDir)._2.foreach(_.delete()) + // TODO: We are using fs here to read the file, checksum verification wouldn't happen + // for the file if enabled, since it is not using fm to open it. Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localDir) // Copy the necessary immutable files @@ -687,6 +710,30 @@ class RocksDBFileManager( deleteChangelogFiles(changelogVersionsAndUniqueIdsToDelete) + // Delete orphan checksum files + val checksumFiles = allFiles + .filter(ChecksumCheckpointFileManager.isChecksumFile) + .map(ChecksumFile) + + // Do this only if we see checksum files in the initial dir listing + // To avoid checking for orphan checksum files, if there is no checksum files + // (e.g. file checksum was never enabled) + if (checksumFiles.nonEmpty) { + // Set of the zip and changelog files that were deleted + val deletedStoreFiles = snapshotVersionsAndUniqueIdsToDelete + .map{ case (v, id) => dfsBatchZipFile(v, id) } ++ + changelogVersionsAndUniqueIdsToDelete + .map{ case (v, id) => dfsChangelogFile(v, id) } + + StateStoreProvider.deleteOrphanChecksumFiles( + fm, + checksumFiles.toSeq, + deletedStoreFiles.toSeq, + minVersionToRetain, + fileChecksumEnabled + ) + } + // Always set minSeenVersion for regular deletion frequency even if deletion fails. // This is safe because subsequent calls retry deleting old version files minSeenVersion = minVersionToRetain diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 63936305c7cbb..322c02b6fceeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamExecution} +import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, ChecksumFile, StatefulOperatorStateInfo, StreamExecution} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{NextIterator, ThreadUtils, Utils} @@ -661,6 +661,55 @@ object StateStoreProvider extends Logging { } } + /** + * If file checksum is enabled, then when we delete store files, using fm.delete, + * then the corresponding checksum file would be deleted too. But there are situations + * where we can have orphan checksum files (i.e. the main file no longer exist), + * so we need to delete them too: + * 1. If the file was created when file checksum was enabled, but then file checksum + * was later disabled. In this case, fm.delete will not delete the checksum file. + * Since we use a specific fm (checksum file manager) when checksum is enabled. + * 2. We enable concurrent file deletion for the checksum file manager, + * for perf improvement, hence there is a slight chance deleting the main file was + * successful but the checksum file deletion failed. Hence, we need to clean it up by + * ourselves. If we don't want this behavior we can turn off concurrent deletion + * for the checksum file manager + * */ + private[streaming] def deleteOrphanChecksumFiles( + fm: CheckpointFileManager, + checksumFiles: Seq[ChecksumFile], + deletedStoreFiles: Seq[Path], + minVersionToRetain: Long, + fileChecksumEnabled: Boolean): Unit = { + // Use file name instead since path format might be different + // i.e. checksum files might have scheme prefix and the deleted files might not. + val deletedStoreFilesSet = deletedStoreFiles.map(_.getName).toSet + val oldChecksumFiles = checksumFiles + .filter(f => f.baseName.split("_")(0).toLong < minVersionToRetain) + + val orphanChecksumFiles = if (fileChecksumEnabled) { + oldChecksumFiles + // old checksum files and was not part of the store files deleted + .filterNot(f => deletedStoreFilesSet.contains(f.mainFilePath.getName)) + } else { + // all old checksum files, in case file checksum was previously enabled + oldChecksumFiles + } + + val (_, dur) = Utils.timeTakenMs { + orphanChecksumFiles.foreach { f => + fm.delete(f.path) + } + } + + if (orphanChecksumFiles.nonEmpty) { + logInfo(log"Deleted orphan checksum files older than version " + + log"${MDC(LogKeys.FILE_VERSION, minVersionToRetain)} " + + log"(timeTakenMs = ${MDC(LogKeys.DURATION, dur)}): " + + log"${MDC(LogKeys.FILE_NAME, orphanChecksumFiles.mkString(", "))}") + } + } + /** * Get the runId from the provided hadoopConf. If it is not found, generate a random UUID. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index b4fbb5560f2f4..38fd299953c21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, ChecksumFSDataInputStream} import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream import org.apache.spark.sql.execution.streaming.state.RecordType.RecordType import org.apache.spark.util.NextIterator @@ -388,6 +388,15 @@ class StateStoreChangelogReaderFactory( } } finally { if (input != null) { + sourceStream match { + case c: ChecksumFSDataInputStream => + // No need to do checksum verification since the reader is still going to read the + // entire file. To avoid double verification and since we only read version here. + // When input.close() below is called, nothing happens for sourceStream + // since it is already closed here. + c.closeWithoutChecksumVerification() + case _ => + } input.close() // input is not set to null because it is effectively lazy. } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index e0450cfc4f694..fd802768da454 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -82,6 +82,9 @@ class StateStoreConf( /** The compression codec used to compress delta and snapshot files. */ val compressionCodec: String = sqlConf.stateStoreCompressionCodec + /** Whether file checksum generation and verification is enabled. */ + val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled + /** whether to validate state schema during query run. */ val stateSchemaCheckEnabled = sqlConf.isStateSchemaCheckEnabled diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index cdf736b1fffca..e45332fa55b92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -39,14 +39,14 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { protected def createManager(path: Path): CheckpointFileManager - private implicit class RichCancellableStream(stream: CancellableFSDataOutputStream) { + protected implicit class RichCancellableStream(stream: CancellableFSDataOutputStream) { def writeContent(i: Int): CancellableFSDataOutputStream = { stream.writeInt(i) stream } } - private implicit class RichFSDataInputStream(stream: FSDataInputStream) { + protected implicit class RichFSDataInputStream(stream: FSDataInputStream) { def readContent(): Int = { val res = stream.readInt() stream.close() @@ -111,6 +111,7 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { fm.open(path) } fm.delete(path) // should not throw exception + fm.close() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala new file mode 100644 index 0000000000000..e3de7c17fa0f5 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ + +import org.apache.spark.SparkException + +abstract class ChecksumCheckpointFileManagerSuite extends CheckpointFileManagerTestsOnLocalFs { + import ChecksumCheckpointFileManager._ + + override protected def checkLeakingCrcFiles(path: Path): Unit = { + // The local implementation of hadoop file system may create crc files (e.g .foo.crc). + // This will validate that those crc files are not leaked (i.e. no orphan crc file). + // Note that for cloud implementation of hadoop file system, crc files are not created. + super.checkLeakingCrcFiles(path) + + // Now let's validate our own crc files to make sure no orphan. + val files = new File(path.toString).listFiles().toSeq + .filter(f => f.isFile).map(f => new Path(f.toPath.toUri)) + val checksumFiles = files + // filter out hadoop crc files if present (e.g .foo.crc) + .filterNot(p => p.getName.startsWith(".") && p.getName.endsWith(".crc")) + .filter(isChecksumFile) + .map(ChecksumFile) + val mainFilesForExistingChecksumFiles = checksumFiles.map(_.mainFilePath) + + // Check all main files exist for all checksum files. + assert(mainFilesForExistingChecksumFiles.toSet.subsetOf(files.toSet), + s"Some checksum files don't have main files - checksum files: $checksumFiles / " + + s"expected main files: $mainFilesForExistingChecksumFiles / actual files: $files") + } + + /** Create a normal CheckpointFileManager (not the checksum checkpoint manager) */ + protected def createNoChecksumManager(path: Path): CheckpointFileManager + + private def makeDir(fm: CheckpointFileManager, dir: Path): Unit = { + assert(!fm.exists(dir)) + fm.mkdirs(dir) + assert(fm.exists(dir)) + } + + private val content = 123456789 + private val fileSize = 4 + + test("detect corrupt file") { + withTempHadoopPath { basePath => + val checksumFm = createManager(basePath) + // Mkdirs + val dir = new Path(s"$basePath/dir/subdir/subsubdir") + makeDir(checksumFm, dir) + + // Create file with checksum + val path = new Path(s"$dir/file") + checksumFm.createAtomic(path, overwriteIfPossible = false).writeContent(content).close() + assert(checksumFm.exists(path)) + + // First verify the content of the checksum file + val regularFm = createNoChecksumManager(basePath) + val checksumStream = regularFm.open(getChecksumPath(path)) + val checksum = Checksum.fromJson( + Source.fromInputStream(checksumStream, StandardCharsets.UTF_8.name()).mkString) + assert(checksum.mainFileSize == fileSize) + checksumStream.close() + + // now corrupt the file + // overwrite the file with a different content. This wouldn't update the checksum file. + regularFm.createAtomic(path, overwriteIfPossible = true).writeContent(content % 10).close() + + val checksumError = intercept[SparkException] { + // Now try to read the file with the checksum manager. + checksumFm.open(path).close() + } + + checkError( + exception = checksumError, + condition = "CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED", + parameters = Map( + "fileName" -> path.toString, + "expectedSize" -> fileSize.toString, + "expectedChecksum" -> "^-?\\d+$", // integer + "computedSize" -> fileSize.toString, + "computedChecksum" -> "^-?\\d+$"), // integer + matchPVals = true) + + checksumFm.close() + } + } + + test("non sequential read is not allowed") { + withTempHadoopPath { basePath => + val checksumFm = createManager(basePath) + // Mkdirs + val dir = new Path(s"$basePath/dir/subdir/subsubdir") + makeDir(checksumFm, dir) + + // Create file + val path = new Path(s"$dir/file") + checksumFm.createAtomic(path, overwriteIfPossible = false).writeContent(content).close() + assert(checksumFm.exists(path)) + + // Attempt non sequential read + val inputStream = checksumFm.open(path) + + val unsupported: Seq[FSDataInputStream => Unit] = Seq( + _.seek(1), + _.getPos(), + _.seekToNewSource(1), + _.read(1, new Array[Byte](1), 0, 1), + _.readFully(1, new Array[Byte](1), 0, 1), + _.readFully(1, new Array[Byte](1)) + ) + + unsupported.foreach(op => { + intercept[UnsupportedOperationException] { + op(inputStream) + } + }) + + checksumFm.close() + } + } + + test("checksum manager can read a file written by other manager") { + withTempHadoopPath { basePath => + val regularFm = createNoChecksumManager(basePath) + // Mkdirs + val dir = new Path(s"$basePath/dir/subdir/subsubdir") + makeDir(regularFm, dir) + + // Create a file using another manager + val path = new Path(s"$dir/file") + regularFm.createAtomic(path, overwriteIfPossible = false).writeContent(content).close() + assert(regularFm.exists(path)) + + // Now try to read the file with the checksum manager. + val checksumFm = createManager(basePath) + assert(checksumFm.open(path).readContent() == content) + checksumFm.close() + } + } + + test("other manager can read a file written by checksum manager") { + withTempHadoopPath { basePath => + val checksumFm = createManager(basePath) + // Mkdirs + val dir = new Path(s"$basePath/dir/subdir/subsubdir") + makeDir(checksumFm, dir) + + // Create a file using checksum manager + val path = new Path(s"$dir/file") + checksumFm.createAtomic(path, overwriteIfPossible = false).writeContent(content).close() + assert(checksumFm.exists(path)) + checksumFm.close() + + // Now try to read the file with other manager. + val regularFm = createNoChecksumManager(basePath) + assert(regularFm.open(path).readContent() == content) + } + } +} + +class FileContextChecksumCheckpointFileManagerSuite extends ChecksumCheckpointFileManagerSuite { + override def createManager(path: Path): CheckpointFileManager = { + new ChecksumCheckpointFileManager( + createNoChecksumManager(path), + allowConcurrentDelete = true, + numThreads = 4) + } + + protected def createNoChecksumManager(path: Path): CheckpointFileManager = { + new FileContextBasedCheckpointFileManager(path, new Configuration()) + } +} + +class FileSystemChecksumCheckpointFileManagerSuite extends ChecksumCheckpointFileManagerSuite { + override def createManager(path: Path): CheckpointFileManager = { + new ChecksumCheckpointFileManager( + createNoChecksumManager(path), + allowConcurrentDelete = true, + numThreads = 4) + } + + protected def createNoChecksumManager(path: Path): CheckpointFileManager = { + new FileSystemBasedCheckpointFileManager(path, new Configuration()) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala index 9429cd5ef39ef..76b68205c2eef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala @@ -309,7 +309,9 @@ object FailureInjectionRocksDBStateStoreProvider { localTempDir, hadoopConf, codecName, - loggingId = loggingId) { + loggingId = loggingId, + fileChecksumEnabled = this.conf.fileChecksumEnabled, + fileChecksumThreadPoolSize = this.fileChecksumThreadPoolSize) { override def getFileSystem( myDfsRootDir: String, myHadoopConf: Configuration): FileSystem = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index b135086821886..e4a08fda88a06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -2136,6 +2136,14 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid sqlConf = Some(getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory))) } + override def newStoreProviderWithClonedConf( + storeId: StateStoreId): RocksDBStateStoreProvider = { + newStoreProvider( + storeId, + NoPrefixKeyStateEncoderSpec(keySchema), + sqlConf = Some(cloneSQLConf())) + } + override def getDefaultSQLConf( minDeltasForSnapshot: Int, numOfVersToRetainInMemory: Int): SQLConf = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 08648148b4af4..baf82dc81cf7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -738,13 +738,13 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // Put should create a temp file put(store0, "a", 0, 1) - assert(numTempFiles === 1) + assert(numTempFiles === 2) assert(numDeltaFiles === 0) // Commit should remove temp file and create a delta file store0.commit() assert(numTempFiles === 0) - assert(numDeltaFiles === 1) + assert(numDeltaFiles === 2) // Remove should create a temp file val store1 = shouldNotCreateTempFile { @@ -754,13 +754,13 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] version = 1, None, None, useColumnFamilies = false, storeConf, hadoopConf) } remove(store1, _._1 == "a") - assert(numTempFiles === 1) - assert(numDeltaFiles === 1) + assert(numTempFiles === 2) + assert(numDeltaFiles === 2) // Commit should remove temp file and create a delta file store1.commit() assert(numTempFiles === 0) - assert(numDeltaFiles === 2) + assert(numDeltaFiles === 4) // Commit without any updates should create a delta file val store2 = shouldNotCreateTempFile { @@ -771,7 +771,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } store2.commit() assert(numTempFiles === 0) - assert(numDeltaFiles === 3) + assert(numDeltaFiles === 6) } test("SPARK-21145: Restarted queries create new provider instances") { @@ -980,6 +980,15 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] numOfVersToRetainInMemory = numOfVersToRetainInMemory) } + override def newStoreProviderWithClonedConf( + storeId: StateStoreId): HDFSBackedStateStoreProvider = { + newStoreProvider( + storeId.operatorId, + storeId.partitionId, + dir = storeId.checkpointRootLocation, + sqlConfOpt = Some(cloneSQLConf())) + } + override def getLatestData( storeProvider: HDFSBackedStateStoreProvider, useColumnFamilies: Boolean = false): Set[((String, Int), Int)] = { @@ -1008,6 +1017,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] sqlConf.setConf(SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY, numOfVersToRetainInMemory) sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2) sqlConf.setConf(SQLConf.STATE_STORE_COMPRESSION_CODEC, SQLConf.get.stateStoreCompressionCodec) + sqlConf.setConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED, SQLConf.get.checkpointFileChecksumEnabled) sqlConf } @@ -1017,10 +1028,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] keyStateEncoderSpec: KeyStateEncoderSpec = NoPrefixKeyStateEncoderSpec(keySchema), keySchema: StructType = keySchema, dir: String = newDir(), + sqlConfOpt: Option[SQLConf] = None, minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, numOfVersToRetainInMemory: Int = SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get, hadoopConf: Configuration = new Configuration): HDFSBackedStateStoreProvider = { - val sqlConf = getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory) + val sqlConf = sqlConfOpt.getOrElse( + getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory)) val provider = new HDFSBackedStateStoreProvider() provider.init( StateStoreId(dir, opId, partition), @@ -1459,6 +1472,193 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } } + testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") { + _ => + val storeId = StateStoreId(newDir(), 0L, 1) + var version = 0L + + // Commit to store using file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(version) + put(store, "1", 11, 100) + put(store, "2", 22, 200) + version = store.commit() + } + } + + // Reload the store and commit without file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + assert(version == 1) + val store = provider.getStore(version) + assert(get(store, "1", 11) === Some(100)) + assert(get(store, "2", 22) === Some(200)) + + put(store, "3", 33, 300) + put(store, "4", 44, 400) + version = store.commit() + } + } + + // Reload the store and commit with file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + assert(version == 2) + val store = provider.getStore(version) + assert(get(store, "1", 11) === Some(100)) + assert(get(store, "2", 22) === Some(200)) + assert(get(store, "3", 33) === Some(300)) + assert(get(store, "4", 44) === Some(400)) + + put(store, "5", 55, 500) + version = store.commit() + } + } + } + + test("checksum files are also cleaned up during maintenance") { + val storeId = StateStoreId(newDir(), 0L, 1) + val numBatches = 6 + val minDeltas = 2 + // Adding 1 to ensure snapshot is uploaded. + // Snapshot upload might happen at minDeltas or minDeltas + 1, depending on the provider + val maintFrequency = minDeltas + 1 + var version = 0L + + withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1", + // So that RocksDB will also generate changelog files + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> + true.toString) { + + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + (version + 1 to numBatches).foreach { i => + version = putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0) + } + + // This is because, hdfs and rocksdb old files detection logic is different + provider match { + case _: HDFSBackedStateStoreProvider => + // For HDFS State store, files left: + // 3.delta to 6.delta (+ checksum file) + // 3.snapshot (+ checksum file), 6.snapshot (+ checksum file) + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 12, expectedNumChecksumFiles = 6) + case _ => + // For RocksDB State store, files left: + // 6.changelog (+ checksum file), 6.zip (+ checksum file) + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 4, expectedNumChecksumFiles = 2) + } + } + + // turn off file checksum, and verify that the previously created checksum files + // will be deleted by maintenance + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + (version + 1 to version + numBatches).foreach { i => + version = putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0) + } + + // now verify no checksum files are left + // This is because, hdfs and rocksdb old files detection logic is different + provider match { + case _: HDFSBackedStateStoreProvider => + // For HDFS State store, files left: + // 6.delta, 9.delta to 12.delta + // 9.snapshot, 12.snapshot + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 7, expectedNumChecksumFiles = 0) + case _ => + // For RocksDB State store, files left: + // 12.changelog, 12.zip + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 2, expectedNumChecksumFiles = 0) + } + } + } + } + } + + testWithAllCodec("overwrite state file without overwriting checksum file") { _ => + val storeId = StateStoreId(newDir(), 0L, 1) + val numBatches = 3 + val minDeltas = 2 + + withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString, + // So that RocksDB will also generate changelog files + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> + true.toString) { + + // First run with file checksum enabled. It will generate state and checksum files. + // Turn off file checksum, and regenerate only the state files + Seq(true, false).foreach { fileChecksumEnabled => + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> fileChecksumEnabled.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + (1 to numBatches).foreach { i => + putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = false) + } + + // This should only create snapshot and no delete + provider.doMaintenance() + + // number of files should be the same. + // 3 delta/changelog files, 1 snapshot (with checksum files) + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 8, expectedNumChecksumFiles = 4) + } + } + } + + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + // now try to load the store with checksum enabled. + // It will verify the overwritten state files with the checksum files. + (1 to numBatches).foreach { i => + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + // load from DFS should be successful + val store = provider.getStore(i) + store.abort() + } + } + } + } + } + + private def verifyChecksumFiles( + dir: String, expectedNumFiles: Int, expectedNumChecksumFiles: Int): Unit = { + val allFiles = new File(dir) + // filter out dirs and local hdfs files + .listFiles().filter(f => f.isFile && !f.getName.startsWith(".")) + .map(f => new Path(f.toURI)).toSet + assert(allFiles.size == expectedNumFiles) + + val checksumFiles = allFiles.filter( + ChecksumCheckpointFileManager.isChecksumFile).map(ChecksumFile) + assert(checksumFiles.size == expectedNumChecksumFiles) + + // verify that no orphan checksum file i.e. the respective main file should be present + assert(checksumFiles.forall(c => allFiles.contains(c.mainFilePath))) + } + + private def putAndCommitStore( + provider: ProviderClass, loadVersion: Long, doMaintenance: Boolean): Long = { + val store = provider.getStore(loadVersion) + put(store, loadVersion.toString, loadVersion.toInt, loadVersion.toInt * 100) + val newVersion = store.commit() + + if (doMaintenance) { + provider.doMaintenance() + } + + newVersion + } + test("StateStore.get") { val conf = new SparkConf() .setMaster("local") @@ -1742,6 +1942,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] /** Return a new provider with useColumnFamilies set to true */ def newStoreProvider(useColumnFamilies: Boolean): ProviderClass + /** Return a new provider with the given id using a clone of SQLConf */ + def newStoreProviderWithClonedConf(storeId: StateStoreId): ProviderClass + /** Get the latest data referred to by the given provider but not using this provider */ def getLatestData(storeProvider: ProviderClass, useColumnFamilies: Boolean): Set[((String, Int), Int)] @@ -1772,6 +1975,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] /** Get the `SQLConf` by the given minimum delta and version to retain in memory */ def getDefaultSQLConf(minDeltasForSnapshot: Int, numOfVersToRetainInMemory: Int): SQLConf + def cloneSQLConf(): SQLConf = SQLConf.get.clone() + /** Get the `StateStoreConf` used by the tests with default setting */ def getDefaultStoreConf(): StateStoreConf = StateStoreConf.empty From d0eb08af8ac1df784203933c287c2c7dcc4cfd8e Mon Sep 17 00:00:00 2001 From: micheal-o Date: Thu, 1 May 2025 12:49:38 -0700 Subject: [PATCH 2/5] fix StateDataSourceTransformWithStateSuite and RocksDBCheckpointFailureInjectionSuite tests --- .../StateDataSourceTransformWithStateSuite.scala | 3 ++- .../RocksDBCheckpointFailureInjectionSuite.scala | 12 ++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala index 7d242c7444f13..fb0666d482c4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala @@ -1072,7 +1072,8 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest val dfsRootDir = new File(tmpDir.getAbsolutePath + "/state/0/4") val fileManager = new RocksDBFileManager( dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration, - CompressionCodec.LZ4) + CompressionCodec.LZ4, fileChecksumEnabled = SQLConf.get.checkpointFileChecksumEnabled, + fileChecksumThreadPoolSize = Some(2)) // Read the changelog for one of the partitions at version 3 and // ensure that we have two entries diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala index 0c3e457c8df1a..5fd95e2ff294f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala @@ -251,8 +251,16 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest injectionState.createAtomicDelayCloseRegex = Seq.empty db.load(1, checkpointId1) - - db.put("version", "2.2") + val value = if (ifEnableStateStoreCheckpointIds) { + // We can write a different value since the files will have different checkpointId. + "2.2" + } else { + // We must write the same value or else checksum verification will fail. + // This test is only overwriting the state file without overwriting checksum file. + // Also, since batches are deterministic in checkpoint v1. + "2.1" + } + db.put("version", value) checkpointId2 = commitAndGetCheckpointId(db) assert(injectionState.delayedStreams.nonEmpty) From ddb4cb8c23cf40e34e44a969b6598c1761f5af21 Mon Sep 17 00:00:00 2001 From: micheal-o Date: Thu, 1 May 2025 12:54:38 -0700 Subject: [PATCH 3/5] change closing without checksum to debug log --- .../execution/streaming/ChecksumCheckpointFileManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala index 9d3b808aa48c2..64843f008d728 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala @@ -328,7 +328,9 @@ class ChecksumFSDataInputStream( * */ def closeWithoutChecksumVerification(): Unit = { if (!closed) { - logWarning(log"Closing file ${MDC(PATH, path)} without checksum verification") + // Ideally this should be warning, but if a file is doing this frequently + // it will cause unnecessary noise in the logs. This can be changed later. + logDebug(log"Closing file ${MDC(PATH, path)} without checksum verification") closeInternal() } } From 7c98a4bb1390bfd93db620a9072665651f1d4569 Mon Sep 17 00:00:00 2001 From: micheal-o Date: Fri, 2 May 2025 11:23:45 -0700 Subject: [PATCH 4/5] nits --- .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../streaming/ChecksumCheckpointFileManager.scala | 3 ++- .../sql/execution/streaming/state/RocksDB.scala | 1 - .../streaming/state/RocksDBFileManager.scala | 4 ++-- .../execution/streaming/state/StateStore.scala | 15 ++++++++++++--- 5 files changed, 17 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c715daeec5975..53418139c209e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3129,7 +3129,7 @@ object SQLConf { val STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED = buildConf("spark.sql.streaming.checkpoint.fileChecksum.enabled") .internal() - .doc("When true, checksum would be generated and verified for some checkpoint files. " + + .doc("When true, checksum would be generated and verified for checkpoint files. " + "This is used to detect file corruption.") .version("4.1.0") .booleanConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala index 64843f008d728..541cc23f58b3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala @@ -59,7 +59,8 @@ object ChecksumFileCreatorInfo { * Holds the checksum value and additional information */ case class Checksum( algorithm: String, - // Making this a string to be agnostic of algorithm used + // Making this a string to be agnostic of algorithm used and be easily readable. + // We can change this to byte array later if we start using algos with large values. value: String, mainFileSize: Long, timestampMs: Long, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 7ee8a06d34718..f453af07f85eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1255,7 +1255,6 @@ class RocksDB( silentDeleteRecursively(localRootDir, "closing RocksDB") // Clear internal maps to reset the state clearColFamilyMaps() - fileManager.close() } catch { case e: Exception => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 8e75dbb06a475..62d1bdf7e4c2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -363,8 +363,8 @@ class RocksDBFileManager( } else { // Delete all non-immutable files in local dir, and unzip new ones from DFS commit file listRocksDBFiles(localDir)._2.foreach(_.delete()) - // TODO: We are using fs here to read the file, checksum verification wouldn't happen - // for the file if enabled, since it is not using fm to open it. + // TODO(SPARK-51988): We are using fs here to read the file, checksum verification + // wouldn't happen for the file if enabled, since it is not using fm to open it. Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localDir) // Copy the necessary immutable files diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 322c02b6fceeb..b2f98af405993 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -696,17 +696,26 @@ object StateStoreProvider extends Logging { oldChecksumFiles } + val failedFiles = mutable.ListBuffer[Path]() val (_, dur) = Utils.timeTakenMs { orphanChecksumFiles.foreach { f => - fm.delete(f.path) + try { + fm.delete(f.path) + } catch { + case NonFatal(e) => + failedFiles += f.path + logWarning(log"Failed to delete orphan checksum file " + + log"${MDC(LogKeys.FILE_NAME, f.path)}", e) + } } } if (orphanChecksumFiles.nonEmpty) { logInfo(log"Deleted orphan checksum files older than version " + log"${MDC(LogKeys.FILE_VERSION, minVersionToRetain)} " + - log"(timeTakenMs = ${MDC(LogKeys.DURATION, dur)}): " + - log"${MDC(LogKeys.FILE_NAME, orphanChecksumFiles.mkString(", "))}") + log"(timeTakenMs = ${MDC(LogKeys.DURATION, dur)}) - " + + log"Orphan files: ${MDC(LogKeys.FILE_NAME, orphanChecksumFiles.mkString(", "))}" + + log"Failed to Delete: ${MDC(LogKeys.FILE_NAME, failedFiles.mkString(", "))}") } } From 1e983989f181684a221f00c9051ea40d04dbaa81 Mon Sep 17 00:00:00 2001 From: micheal-o Date: Fri, 2 May 2025 11:35:04 -0700 Subject: [PATCH 5/5] just use int as val for now --- .../spark/sql/errors/QueryExecutionErrors.scala | 8 ++++---- .../streaming/ChecksumCheckpointFileManager.scala | 11 +++++------ 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 11abaf3979985..75f8ccffe810b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2625,17 +2625,17 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE def checkpointFileChecksumVerificationFailed( file: Path, expectedSize: Long, - expectedChecksum: String, + expectedChecksum: Int, computedSize: Long, - computedChecksum: String): Throwable = { + computedChecksum: Int): Throwable = { new SparkException( errorClass = "CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED", messageParameters = Map( "fileName" -> file.toString, "expectedSize" -> expectedSize.toString, - "expectedChecksum" -> expectedChecksum, + "expectedChecksum" -> expectedChecksum.toString, "computedSize" -> computedSize.toString, - "computedChecksum" -> computedChecksum), + "computedChecksum" -> computedChecksum.toString), cause = null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala index 541cc23f58b3d..440e7fc13cce4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala @@ -59,9 +59,8 @@ object ChecksumFileCreatorInfo { * Holds the checksum value and additional information */ case class Checksum( algorithm: String, - // Making this a string to be agnostic of algorithm used and be easily readable. - // We can change this to byte array later if we start using algos with large values. - value: String, + // We can make this a byte array later to be agnostic of algorithm used. + value: Int, mainFileSize: Long, timestampMs: Long, creator: ChecksumFileCreatorInfo) { @@ -392,14 +391,14 @@ class ChecksumFSDataInputStream( verified = true // Compare file size too, in case of collision - if (expectedChecksum.value.toInt != computedChecksumValue || + if (expectedChecksum.value != computedChecksumValue || expectedChecksum.mainFileSize != computedFileSize) { throw QueryExecutionErrors.checkpointFileChecksumVerificationFailed( path, expectedSize = expectedChecksum.mainFileSize, expectedChecksum.value, computedSize = computedFileSize, - computedChecksumValue.toString) + computedChecksumValue) } } } @@ -496,7 +495,7 @@ class ChecksumCancellableFSDataOutputStream( val checksumFuture = Future { val json = Checksum( algorithm = "CRC32C", - value = chkValue.toString, + value = chkValue, mainFileSize = mainFileSize, timestampMs = System.currentTimeMillis(), creator = creator).json()