Skip to content

[SPARK-51972][SS] State Store file integrity verification using checksum #50773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

micheal-o
Copy link
Contributor

@micheal-o micheal-o commented May 1, 2025

What changes were proposed in this pull request?

Introducing file integrity verification for state store files by also generating and uploading checksum file. This allows us to verify the file checksum during read. Introduced a new spark conf to enable/disable this (enabled by default). This can be enabled then disabled and vice versa, on the same checkpoint location, giving users the flexibility to turn on/off as needed.

This implementation is completely backward compatible, it can be enabled on an existing query/checkpoint, and can be disabled later on without breaking the query/checkpoint.

It is currently used for the following state files: delta, snapshot, changelog, zip. We can later enable this for other checkpoint files too e.g. operator metadata files, commit/offset log etc.

Why are the changes needed?

Integrity verification to detect file corruption and provide necessary additional information.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added additional tests

Was this patch authored or co-authored using generative AI tooling?

No

case class Checksum(
algorithm: String,
// Making this a string to be agnostic of algorithm used
value: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the checksum value right ? why not just use a byte array here ?

Copy link
Contributor Author

@micheal-o micheal-o May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now using the original int, we can use byte array later when we have other algos, in order to be agnostic. Added comment for that

case _: FileNotFoundException =>
// Ignore if file has already been deleted
// or the main file was created initially without checksum
logWarning(log"Skipping deletion of checksum file ${MDC(PATH, checksumPath)} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this also apply to existing checkpoints also that dont have checksum verification enabled ? should we log only if the feature is enabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the underlying delete call doesn't actually throw error if file not found (It is part of requirement of checkpointfm api), so I will just remove this catch anyway.

// Wait a bit for it to finish up in case there is any ongoing work
// Can consider making this timeout configurable, if needed
val timeoutMs = 100
if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wait a bit longer here ? also - should we force the shutdown on some timeout ?

// Compare file size too, in case of collision
if (expectedChecksum.value.toInt != computedChecksumValue ||
expectedChecksum.mainFileSize != computedFileSize) {
throw QueryExecutionErrors.checkpointFileChecksumVerificationFailed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to StateStoreErrors.scala ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should mix this layer with state store code. File manager is at a lower layer and this can be used for non-state store files too.

* compute the checksum. This is blocking the seekable apis from being used in the underlying stream
* */
private class CheckedSequentialInputStream(data: InputStream)
// TODO: Make the checksum algo configurable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add it in this change itself ? if you want to do this as a follow up - lets create the SPARK JIRA for this and link it here ?

path: Path,
private val checksumStream: CancellableFSDataOutputStream,
private val uploadThreadPool: ExecutionContext)
// TODO: make the checksum algo configurable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@@ -1072,7 +1072,8 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest
val dfsRootDir = new File(tmpDir.getAbsolutePath + "/state/0/4")
val fileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration,
CompressionCodec.LZ4)
CompressionCodec.LZ4, fileChecksumEnabled = SQLConf.get.checkpointFileChecksumEnabled,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@micheal-o - do we need to add any additional tests to test state data source reader with integrity verification enabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for additional tests specific to state reader. Existing reader tests are exercising that already.


// turn off file checksum, and verify that the previously created checksum files
// will be deleted by maintenance
withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test on golden files created from a Spark 4.0 checkpoint ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get what you mean here? I already have a test that verifies that we can read checkpoint written without checksum enabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - but to be safe - we usually also confirm against released versions. so you can generate the golden files for that version and run your tests against that path (you can check the usage of this dir for eg - https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala#L1188 )

@@ -1459,6 +1472,193 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
}

testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") {
_ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : move to line above ?

}
}

private def verifyChecksumFiles(
Copy link
Contributor

@anishshri-db anishshri-db May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we covering all the files we are adding the checksum for ? i.e. do we have the necessary test coverage for all the types of files now ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this covers all the file types we are enabling for. This runs for HDFS and RocksDB state store with all the different compression types. For RocksDB also runs for avro too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants