File tree 3 files changed +5
-3
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state
3 files changed +5
-3
lines changed Original file line number Diff line number Diff line change @@ -488,6 +488,7 @@ private[sql] class RocksDBStateStoreProvider
488
488
existingStore match {
489
489
case Some (stateStore : RocksDBStateStore ) =>
490
490
// Reuse existing store for getWriteStore case
491
+ StateStoreThreadLocalTracker .setUsedForWriteStore(true )
491
492
stateStore
492
493
case Some (_) =>
493
494
throw new IllegalArgumentException (" Existing store must be a RocksDBStateStore" )
Original file line number Diff line number Diff line change @@ -37,10 +37,13 @@ object StateStoreThreadLocalTracker {
37
37
def setStore (store : ReadStateStore ): Unit = readStore.set(store)
38
38
39
39
def getStore : Option [ReadStateStore ] = {
40
- usedForWriteStore.set(true )
41
40
Option (readStore.get())
42
41
}
43
42
43
+ def setUsedForWriteStore (used : Boolean ): Unit = {
44
+ usedForWriteStore.set(used)
45
+ }
46
+
44
47
def isUsedForWriteStore : Boolean = usedForWriteStore.get()
45
48
46
49
def clearStore (): Unit = readStore.remove()
Original file line number Diff line number Diff line change @@ -113,12 +113,10 @@ package object state {
113
113
val ctxt = TaskContext .get()
114
114
ctxt.addTaskCompletionListener[Unit ](_ => {
115
115
if (! StateStoreThreadLocalTracker .isUsedForWriteStore) store.release()
116
- StateStoreThreadLocalTracker .clearStore()
117
116
})
118
117
ctxt.addTaskFailureListener(new TaskFailureListener {
119
118
override def onTaskFailure (context : TaskContext , error : Throwable ): Unit =
120
119
if (! StateStoreThreadLocalTracker .isUsedForWriteStore) store.abort()
121
- StateStoreThreadLocalTracker .clearStore()
122
120
})
123
121
cleanedF(store, iter)
124
122
}
You can’t perform that action at this time.
0 commit comments