Skip to content

[SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations #50742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Apr 28, 2025

What changes were proposed in this pull request?

Adding a release() method to the ReadStateStore interface to properly close read stores without aborting them
Implementing a getWriteStore() method that allows converting a read-only store to a writable store
Creating a StateStoreRDDProvider interface for tracking state stores by partition ID
Enhancing StateStoreRDD to find and reuse existing state stores through RDD lineage
Improving task completion handling with proper cleanup listeners

Why are the changes needed?

Currently, stateful operations like aggregations follow a pattern where both read and write stores are opened simultaneously:
readStore.acquire()
writeStore.acquire()
writeStore.commit()
readStore.abort()
This pattern creates inefficiency because:

The abort() call on the read store unnecessarily invalidates the store's state, causing subsequent operations to reload the entire state store from scratch
Having two stores open simultaneously increases memory usage and can create contention issues
The upcoming lock hardening changes will only allow one state store to be open at a time, making this pattern incompatible

With the new approach, the usage paradigm becomes:
readStore = getReadStore()
writeStore = getWriteStore(readStore)
writeStore.commit()
This new paradigm allows us to reuse an existing read store by converting it to a write store using getWriteStore(), and properly clean up resources using release() instead of abort() when operations complete successfully. This avoids the unnecessary reloading of state data and improves performance while being compatible with future lock hardening changes.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@ericm-db ericm-db force-pushed the read-store-changes branch from 14565a8 to c4c6c07 Compare April 29, 2025 21:06
@ericm-db ericm-db changed the title [WIP] Adding release() to ReadStateStore interface Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations Apr 29, 2025
@ericm-db ericm-db changed the title Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations [SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations Apr 29, 2025
loadStateStore(version, uniqueId, readOnly = false)
}

override def getWriteStore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to getWriteStoreFromReadStore?

readStore: ReadStateStore,
version: Long,
uniqueId: Option[String] = None): StateStore = {
assert(version == readStore.version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a comment or more informative error msg here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -565,6 +582,11 @@ trait StateStoreProvider {
version: Long,
stateStoreCkptId: Option[String] = None): StateStore

def getWriteStore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs comment?

if (version < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
}
hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting this twice? Can you add more comments about what is going on here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea prob dont need to set multiple times

partitionStores.put(partitionId, (store, false))

// Register a cleanup callback to be executed when the task completes
ctxt.addTaskCompletionListener[Unit](_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding the listeners here? Is it different from the one in mapPartitionsWithReadStateStore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericm-db - i guess we did not register any listener before in the ReadStoreRDD path ?

@anishshri-db
Copy link
Contributor

@ericm-db - in the PR description, please update it to say what the new usage paradigm will look like as well

@liviazhu-db
Copy link
Contributor

liviazhu-db commented Apr 29, 2025

@ericm-db Can you update the PR description to be more specific about the inefficiency we are addressing here? Basically that in the current impl, we always abort read store, triggering unnecessary reload of the state store.

@ericm-db
Copy link
Contributor Author

@anishshri-db @liviazhu-db Sure yeah sounds good

* @param uniqueId Optional unique identifier for checkpointing
* @return A writable StateStore instance that can be used to update and commit changes
*/
def getWriteStoreFromReadStore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ni: should we rename as upgradeReadStoreToWriteStore ?

@@ -34,7 +59,7 @@ abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
operatorId: Long,
sessionState: SessionState,
@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional ?


val inputIter = dataRDD.iterator(partition, ctxt)
val store = StateStore.getReadOnly(
storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, storeVersion,
stateStoreCkptIds.map(_.apply(partition.index).head),
stateStoreCkptIds.map(_.apply(partitionId).head),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug before ?

Copy link
Contributor Author

@ericm-db ericm-db Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No these are equivalent though. I'll change it back

TaskContext.get().addTaskCompletionListener[Unit](_ => {
store.abort()
val taskContext = TaskContext.get()
taskContext.addTaskCompletionListener[Unit](_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it again here ?

@ericm-db
Copy link
Contributor Author

cc @cloud-fan

@anishshri-db
Copy link
Contributor

cc - @cloud-fan - could you PTAL too ? especially around the RDD interactions ? Thx

@ericm-db ericm-db force-pushed the read-store-changes branch from 0c913e2 to f7d0e70 Compare April 30, 2025 22:48
Copy link
Contributor

@liviazhu-db liviazhu-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Could you add a test in StateStoreRDDSuite to check the ThreadLocal logic correctly passes the readstore to the writestore too?

@ericm-db
Copy link
Contributor Author

ericm-db commented May 1, 2025

Looks good! Could you add a test in StateStoreRDDSuite to check the ThreadLocal logic correctly passes the readstore to the writestore too?

Yup, working on that rn!

* This allows a ReadStateStore to be reused by a subsequent StateStore operation.
*/
object StateStoreThreadLocalTracker {
private val readStore: ThreadLocal[ReadStateStore] = new ThreadLocal[ReadStateStore]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine these into a single thread local ? Just make them members of a case class ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure yeah

*/
object StateStoreThreadLocalTracker {
/** Case class to hold both the store and its usage state */
case class StoreInfo(store: ReadStateStore, usedForWriteStore: Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe move members to a new line each ?

@@ -194,6 +196,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
log"for ${MDC(LogKeys.STATE_STORE_PROVIDER, this)}")
}

override def release(): Unit = {}
Copy link
Contributor

@liviazhu-db liviazhu-db May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a new state and update the state here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants