Skip to content

Commit

Permalink
Fix race condition when disposing WorkRunners in an active loop
Browse files Browse the repository at this point in the history
  • Loading branch information
DrewCarlson committed Aug 18, 2022
1 parent b091bd1 commit 90f274b
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 68 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ plugins {
alias(libs.plugins.dokka)
alias(libs.plugins.binaryCompat) apply false
alias(libs.plugins.kover)
id("com.louiscad.complete-kotlin") version "1.1.0"
}

allprojects {
Expand Down
16 changes: 11 additions & 5 deletions mobiuskt-core/src/androidMain/kotlin/runners/LooperWorkRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ package kt.mobius.android.runners
import android.os.Handler
import android.os.Looper
import kt.mobius.runners.WorkRunner
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

/** A work runner that uses a [Looper] to run work. */
public open class LooperWorkRunner internal constructor(looper: Looper) : WorkRunner {
private val handler: Handler = Handler(looper)

@Volatile
private var disposed: Boolean = false
private val lock = ReentrantLock()

/** Will cancel all Runnables posted to this looper. */
override fun dispose() {
handler.removeCallbacksAndMessages(null)
disposed = true
lock.withLock {
handler.removeCallbacksAndMessages(null)
disposed = true
}
}

/**
Expand All @@ -23,8 +27,10 @@ public open class LooperWorkRunner internal constructor(looper: Looper) : WorkRu
* @param runnable the runnable you would like to execute
*/
override fun post(runnable: kt.mobius.runners.Runnable) {
if (disposed) return
handler.post(runnable)
lock.withLock {
if (disposed) return
handler.post(runnable)
}
}

public companion object {
Expand Down
4 changes: 4 additions & 0 deletions mobiuskt-core/src/commonMain/kotlin/runners/WorkRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import kotlin.js.JsExport
@JsExport
public interface WorkRunner : Disposable {

/**
* Must discard any new [Runnable] immediately after dispose method of [Disposable] is
* called. Not doing this may result in undesired side effects, crashes, race conditions etc.
*/
@Suppress("NON_EXPORTABLE_TYPE")
public fun post(runnable: Runnable)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kt.mobius.runners

import kotlinx.atomicfu.locks.ReentrantLock
import kotlinx.atomicfu.locks.withLock
import platform.darwin.*

@Suppress("UnusedReceiverParameter")
Expand All @@ -21,14 +23,18 @@ public class DispatchQueueWorkRunner(
private val dispatchQueue: dispatch_queue_t
) : WorkRunner {

private val lock = ReentrantLock()

init {
check(Platform.memoryModel == MemoryModel.EXPERIMENTAL) {
"Using DispatchQueueWorkRunner requires the experimental memory model.\nSee https://github.com/JetBrains/kotlin/blob/master/kotlin-native/NEW_MM.md"
}
}

override fun post(runnable: Runnable) {
dispatch_async(dispatchQueue, runnable::run)
lock.withLock {
dispatch_async(dispatchQueue, runnable::run)
}
}

override fun dispose(): Unit = Unit
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
package kt.mobius.runners

import kotlinx.atomicfu.locks.withLock
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock

/** A [WorkRunner] implementation that is backed by an [ExecutorService]. */
public class ExecutorServiceWorkRunner(private val service: ExecutorService) : WorkRunner {

private val lock = ReentrantLock()

override fun post(runnable: Runnable) {
service.submit(runnable)
lock.withLock {
if (!service.isTerminated && !service.isShutdown) {
service.submit(runnable)
}
}
}

override fun dispose() {
try {
val runnables = service.shutdownNow()
lock.withLock {
val runnables = service.shutdownNow()

if (runnables.isNotEmpty()) {
println("Disposing ExecutorServiceWorkRunner with ${runnables.size} outstanding tasks.")
if (runnables.isNotEmpty()) {
println("Disposing ExecutorServiceWorkRunner with ${runnables.size} outstanding tasks.")
}
}

if (!service.awaitTermination(100, TimeUnit.MILLISECONDS)) {
Expand Down
22 changes: 13 additions & 9 deletions mobiuskt-core/src/jvmTest/kotlin/ExecutorServiceWorkRunnerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,24 @@ class ExecutorServiceWorkRunnerTest {
}

@Test
fun tasksShouldBeRejectedAfterDispose() {
fun tasksShouldBeSkippedAfterDispose() {
val output = CopyOnWriteArrayList<Int>()
val service = Executors.newSingleThreadExecutor()

underTest = ExecutorServiceWorkRunner(service)
underTest.dispose()

thrown.expect(RejectedExecutionException::class.java)
underTest.post {
output.add(1)
output.add(2)
}

underTest.post(
object : Runnable {
override fun run() {
println("ERROR: this shouldn't run/be printed!")
}
})
underTest.dispose()
underTest.post {
output.add(3)
output.add(4)
}

assertEquals(output, listOf(1, 2))
}

@Test
Expand Down
36 changes: 36 additions & 0 deletions mobiuskt-core/src/jvmTest/kotlin/MobiusLoopDisposalBehaviorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import kt.mobius.Next.Companion.next
import kt.mobius.Next.Companion.noChange
import kt.mobius.disposables.Disposable
import kt.mobius.functions.Consumer
import kt.mobius.functions.Producer
import kt.mobius.runners.WorkRunner
import kt.mobius.runners.WorkRunners
import kt.mobius.test.RecordingConsumer
import kt.mobius.test.RecordingModelObserver
import kt.mobius.test.TestWorkRunner
Expand All @@ -17,6 +20,7 @@ import kt.mobius.testdomain.TestEffect
import kt.mobius.testdomain.TestEvent
import java.util.*
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.Nonnull
Expand Down Expand Up @@ -223,6 +227,38 @@ class MobiusLoopDisposalBehavior : MobiusLoopTest() {
}
}

@Test
fun shouldSafelyDisposeWhenDisposeAndEventsAreOnDifferentThreads() {
val random = Random()
val builder: MobiusLoop.Builder<String, TestEvent, TestEffect> = loop(update, effectHandler)
.eventRunner { WorkRunners.from(Executors.newFixedThreadPool(4)) }
val thread = Thread {
for (i in 0..99) {
mobiusLoop = builder.startFrom("foo")
try {
Thread.sleep(random.nextInt(10).toLong())
} catch (e: InterruptedException) {
throw java.lang.RuntimeException(e)
}
mobiusLoop.dispose()
}
}
thread.start()
for (i in 0..999) {
try {
mobiusLoop.dispatchEvent(TestEvent("bar"))
Thread.sleep(1)
} catch (e: java.lang.IllegalStateException) {
if (e.message != null) {
assertFalse(e.message!!.startsWith("Exception processing event"))
}
} catch (e: InterruptedException) {
throw RuntimeException(e)
}
}
thread.join()
}

internal class EmitDuringDisposeEventSource(private val event: TestEvent) : EventSource<TestEvent> {
@Nonnull
override fun subscribe(eventConsumer: Consumer<TestEvent>): Disposable {
Expand Down
43 changes: 0 additions & 43 deletions mobiuskt-core/src/jvmTest/kotlin/MobiusLoopErrorReporting.kt

This file was deleted.

12 changes: 10 additions & 2 deletions mobiuskt-core/src/nativeMain/kotlin/runners/NativeWorkRunner.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
package kt.mobius.runners

import kotlinx.atomicfu.locks.SynchronizedObject
import kotlinx.atomicfu.locks.synchronized
import kotlin.native.concurrent.Worker

public class NativeWorkRunner(
private val worker: Worker
) : WorkRunner {

private val lock = SynchronizedObject()

init {
check(Platform.memoryModel == MemoryModel.EXPERIMENTAL) {
"Using NativeWorkRunner requires the experimental memory model.\nSee https://github.com/JetBrains/kotlin/blob/master/kotlin-native/NEW_MM.md"
}
}

override fun post(runnable: Runnable) {
worker.executeAfter(operation = runnable::run)
synchronized(lock) {
worker.executeAfter(operation = runnable::run)
}
}

override fun dispose() {
worker.requestTermination(processScheduledJobs = false)
synchronized(lock) {
worker.requestTermination(processScheduledJobs = false)
}
}
}
3 changes: 2 additions & 1 deletion mobiuskt-coroutines/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ plugins {
id("org.jetbrains.kotlinx.binary-compatibility-validator")
}

apply(plugin = "kotlinx-atomicfu")
apply(from = "../gradle/publishing.gradle.kts")

kotlin {
Expand Down Expand Up @@ -60,13 +61,13 @@ kotlin {
dependencies {
implementation(projects.mobiusktCore)
implementation(libs.coroutines.core)
implementation(libs.atomicfu)
}
}

named("commonTest") {
dependencies {
implementation(projects.mobiusktTest)
implementation(libs.atomicfu)
implementation(libs.coroutines.test)
implementation(kotlin("test-common"))
implementation(kotlin("test-annotations-common"))
Expand Down
13 changes: 10 additions & 3 deletions mobiuskt-coroutines/src/commonMain/kotlin/DispatcherWorkRunner.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kt.mobius.flow

import kotlinx.atomicfu.locks.SynchronizedObject
import kotlinx.atomicfu.locks.synchronized
import kt.mobius.runners.WorkRunner
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
Expand All @@ -9,7 +11,7 @@ import kotlinx.coroutines.launch
import kt.mobius.runners.Runnable
import kt.mobius.runners.WorkRunners

@Suppress("unused")
@Suppress("UnusedReceiverParameter")
public fun WorkRunners.fromDispatcher(dispatcher: CoroutineDispatcher): WorkRunner {
return DispatcherWorkRunner(dispatcher)
}
Expand All @@ -19,13 +21,18 @@ public class DispatcherWorkRunner(
dispatcher: CoroutineDispatcher
) : WorkRunner {

private val lock = SynchronizedObject()
private val scope = CoroutineScope(dispatcher + SupervisorJob())

override fun post(runnable: Runnable) {
scope.launch { runnable.run() }
synchronized(lock) {
scope.launch { runnable.run() }
}
}

override fun dispose() {
scope.cancel()
synchronized(lock) {
scope.cancel()
}
}
}

0 comments on commit 90f274b

Please sign in to comment.