-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51972][SS] State Store file integrity verification using checksum #50773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
case class Checksum( | ||
algorithm: String, | ||
// Making this a string to be agnostic of algorithm used | ||
value: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the checksum value right ? why not just use a byte array here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now using the original int, we can use byte array later when we have other algos, in order to be agnostic. Added comment for that
case _: FileNotFoundException => | ||
// Ignore if file has already been deleted | ||
// or the main file was created initially without checksum | ||
logWarning(log"Skipping deletion of checksum file ${MDC(PATH, checksumPath)} " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this also apply to existing checkpoints also that dont have checksum verification enabled ? should we log only if the feature is enabled ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the underlying delete call doesn't actually throw error if file not found (It is part of requirement of checkpointfm api), so I will just remove this catch anyway.
// Wait a bit for it to finish up in case there is any ongoing work | ||
// Can consider making this timeout configurable, if needed | ||
val timeoutMs = 100 | ||
if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wait a bit longer here ? also - should we force the shutdown on some timeout ?
// Compare file size too, in case of collision | ||
if (expectedChecksum.value.toInt != computedChecksumValue || | ||
expectedChecksum.mainFileSize != computedFileSize) { | ||
throw QueryExecutionErrors.checkpointFileChecksumVerificationFailed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to StateStoreErrors.scala
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should mix this layer with state store code. File manager is at a lower layer and this can be used for non-state store files too.
* compute the checksum. This is blocking the seekable apis from being used in the underlying stream | ||
* */ | ||
private class CheckedSequentialInputStream(data: InputStream) | ||
// TODO: Make the checksum algo configurable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add it in this change itself ? if you want to do this as a follow up - lets create the SPARK JIRA for this and link it here ?
path: Path, | ||
private val checksumStream: CancellableFSDataOutputStream, | ||
private val uploadThreadPool: ExecutionContext) | ||
// TODO: make the checksum algo configurable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
@@ -1072,7 +1072,8 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest | |||
val dfsRootDir = new File(tmpDir.getAbsolutePath + "/state/0/4") | |||
val fileManager = new RocksDBFileManager( | |||
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration, | |||
CompressionCodec.LZ4) | |||
CompressionCodec.LZ4, fileChecksumEnabled = SQLConf.get.checkpointFileChecksumEnabled, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@micheal-o - do we need to add any additional tests to test state data source reader with integrity verification enabled ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for additional tests specific to state reader. Existing reader tests are exercising that already.
|
||
// turn off file checksum, and verify that the previously created checksum files | ||
// will be deleted by maintenance | ||
withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also test on golden files created from a Spark 4.0 checkpoint ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I get what you mean here? I already have a test that verifies that we can read checkpoint written without checksum enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea - but to be safe - we usually also confirm against released versions. so you can generate the golden files for that version and run your tests against that path (you can check the usage of this dir for eg - https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala#L1188 )
@@ -1459,6 +1472,193 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] | |||
} | |||
} | |||
|
|||
testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") { | |||
_ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : move to line above ?
} | ||
} | ||
|
||
private def verifyChecksumFiles( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we covering all the files we are adding the checksum for ? i.e. do we have the necessary test coverage for all the types of files now ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this covers all the file types we are enabling for. This runs for HDFS and RocksDB state store with all the different compression types. For RocksDB also runs for avro too.
What changes were proposed in this pull request?
Introducing file integrity verification for state store files by also generating and uploading checksum file. This allows us to verify the file checksum during read. Introduced a new spark conf to enable/disable this (enabled by default). This can be enabled then disabled and vice versa, on the same checkpoint location, giving users the flexibility to turn on/off as needed.
This implementation is completely backward compatible, it can be enabled on an existing query/checkpoint, and can be disabled later on without breaking the query/checkpoint.
It is currently used for the following state files: delta, snapshot, changelog, zip. We can later enable this for other checkpoint files too e.g. operator metadata files, commit/offset log etc.
Why are the changes needed?
Integrity verification to detect file corruption and provide necessary additional information.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added additional tests
Was this patch authored or co-authored using generative AI tooling?
No