-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations #50742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
14565a8
to
c4c6c07
Compare
loadStateStore(version, uniqueId, readOnly = false) | ||
} | ||
|
||
override def getWriteStore( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to getWriteStoreFromReadStore?
readStore: ReadStateStore, | ||
version: Long, | ||
uniqueId: Option[String] = None): StateStore = { | ||
assert(version == readStore.version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you leave a comment or more informative error msg here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -565,6 +582,11 @@ trait StateStoreProvider { | |||
version: Long, | |||
stateStoreCkptId: Option[String] = None): StateStore | |||
|
|||
def getWriteStore( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add docs comment?
if (version < 0) { | ||
throw QueryExecutionErrors.unexpectedStateStoreVersion(version) | ||
} | ||
hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we setting this twice? Can you add more comments about what is going on here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea prob dont need to set multiple times
partitionStores.put(partitionId, (store, false)) | ||
|
||
// Register a cleanup callback to be executed when the task completes | ||
ctxt.addTaskCompletionListener[Unit](_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding the listeners here? Is it different from the one in mapPartitionsWithReadStateStore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericm-db - i guess we did not register any listener before in the ReadStoreRDD
path ?
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Show resolved
Hide resolved
@ericm-db - in the PR description, please update it to say what the new usage paradigm will look like as well |
@ericm-db Can you update the PR description to be more specific about the inefficiency we are addressing here? Basically that in the current impl, we always abort read store, triggering unnecessary reload of the state store. |
@anishshri-db @liviazhu-db Sure yeah sounds good |
* @param uniqueId Optional unique identifier for checkpointing | ||
* @return A writable StateStore instance that can be used to update and commit changes | ||
*/ | ||
def getWriteStoreFromReadStore( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ni: should we rename as upgradeReadStoreToWriteStore
?
@@ -34,7 +59,7 @@ abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag]( | |||
operatorId: Long, | |||
sessionState: SessionState, | |||
@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], | |||
extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { | |||
extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intentional ?
|
||
val inputIter = dataRDD.iterator(partition, ctxt) | ||
val store = StateStore.getReadOnly( | ||
storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, storeVersion, | ||
stateStoreCkptIds.map(_.apply(partition.index).head), | ||
stateStoreCkptIds.map(_.apply(partitionId).head), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this a bug before ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No these are equivalent though. I'll change it back
TaskContext.get().addTaskCompletionListener[Unit](_ => { | ||
store.abort() | ||
val taskContext = TaskContext.get() | ||
taskContext.addTaskCompletionListener[Unit](_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need it again here ?
cc @cloud-fan |
cc - @cloud-fan - could you PTAL too ? especially around the RDD interactions ? Thx |
0c913e2
to
f7d0e70
Compare
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Could you add a test in StateStoreRDDSuite to check the ThreadLocal logic correctly passes the readstore to the writestore too?
Yup, working on that rn! |
* This allows a ReadStateStore to be reused by a subsequent StateStore operation. | ||
*/ | ||
object StateStoreThreadLocalTracker { | ||
private val readStore: ThreadLocal[ReadStateStore] = new ThreadLocal[ReadStateStore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we combine these into a single thread local ? Just make them members of a case class ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure yeah
*/ | ||
object StateStoreThreadLocalTracker { | ||
/** Case class to hold both the store and its usage state */ | ||
case class StoreInfo(store: ReadStateStore, usedForWriteStore: Boolean = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe move members to a new line each ?
@@ -194,6 +196,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |||
log"for ${MDC(LogKeys.STATE_STORE_PROVIDER, this)}") | |||
} | |||
|
|||
override def release(): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a new state and update the state here?
What changes were proposed in this pull request?
Adding a release() method to the ReadStateStore interface to properly close read stores without aborting them
Implementing a getWriteStore() method that allows converting a read-only store to a writable store
Creating a StateStoreRDDProvider interface for tracking state stores by partition ID
Enhancing StateStoreRDD to find and reuse existing state stores through RDD lineage
Improving task completion handling with proper cleanup listeners
Why are the changes needed?
Currently, stateful operations like aggregations follow a pattern where both read and write stores are opened simultaneously:
readStore.acquire()
writeStore.acquire()
writeStore.commit()
readStore.abort()
This pattern creates inefficiency because:
The abort() call on the read store unnecessarily invalidates the store's state, causing subsequent operations to reload the entire state store from scratch
Having two stores open simultaneously increases memory usage and can create contention issues
The upcoming lock hardening changes will only allow one state store to be open at a time, making this pattern incompatible
With the new approach, the usage paradigm becomes:
readStore = getReadStore()
writeStore = getWriteStore(readStore)
writeStore.commit()
This new paradigm allows us to reuse an existing read store by converting it to a write store using getWriteStore(), and properly clean up resources using release() instead of abort() when operations complete successfully. This avoids the unnecessary reloading of state data and improves performance while being compatible with future lock hardening changes.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
No