diff --git a/build.gradle.kts b/build.gradle.kts index 5dc1a607..9dbaf9ee 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -19,6 +19,7 @@ plugins { alias(libs.plugins.dokka) alias(libs.plugins.binaryCompat) apply false alias(libs.plugins.kover) + id("com.louiscad.complete-kotlin") version "1.1.0" } allprojects { diff --git a/mobiuskt-core/src/androidMain/kotlin/runners/LooperWorkRunner.kt b/mobiuskt-core/src/androidMain/kotlin/runners/LooperWorkRunner.kt index e868abe4..ac10713d 100644 --- a/mobiuskt-core/src/androidMain/kotlin/runners/LooperWorkRunner.kt +++ b/mobiuskt-core/src/androidMain/kotlin/runners/LooperWorkRunner.kt @@ -3,18 +3,22 @@ package kt.mobius.android.runners import android.os.Handler import android.os.Looper import kt.mobius.runners.WorkRunner +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** A work runner that uses a [Looper] to run work. */ public open class LooperWorkRunner internal constructor(looper: Looper) : WorkRunner { private val handler: Handler = Handler(looper) - @Volatile private var disposed: Boolean = false + private val lock = ReentrantLock() /** Will cancel all Runnables posted to this looper. */ override fun dispose() { - handler.removeCallbacksAndMessages(null) - disposed = true + lock.withLock { + handler.removeCallbacksAndMessages(null) + disposed = true + } } /** @@ -23,8 +27,10 @@ public open class LooperWorkRunner internal constructor(looper: Looper) : WorkRu * @param runnable the runnable you would like to execute */ override fun post(runnable: kt.mobius.runners.Runnable) { - if (disposed) return - handler.post(runnable) + lock.withLock { + if (disposed) return + handler.post(runnable) + } } public companion object { diff --git a/mobiuskt-core/src/commonMain/kotlin/runners/WorkRunner.kt b/mobiuskt-core/src/commonMain/kotlin/runners/WorkRunner.kt index 5edc2749..7d5ae7d8 100644 --- a/mobiuskt-core/src/commonMain/kotlin/runners/WorkRunner.kt +++ b/mobiuskt-core/src/commonMain/kotlin/runners/WorkRunner.kt @@ -7,6 +7,10 @@ import kotlin.js.JsExport @JsExport public interface WorkRunner : Disposable { + /** + * Must discard any new [Runnable] immediately after dispose method of [Disposable] is + * called. Not doing this may result in undesired side effects, crashes, race conditions etc. + */ @Suppress("NON_EXPORTABLE_TYPE") public fun post(runnable: Runnable) } diff --git a/mobiuskt-core/src/darwinMain/kotlin/runners/DispatchQueueWorkRunner.kt b/mobiuskt-core/src/darwinMain/kotlin/runners/DispatchQueueWorkRunner.kt index 43f8bd14..f82fc573 100644 --- a/mobiuskt-core/src/darwinMain/kotlin/runners/DispatchQueueWorkRunner.kt +++ b/mobiuskt-core/src/darwinMain/kotlin/runners/DispatchQueueWorkRunner.kt @@ -1,5 +1,7 @@ package kt.mobius.runners +import kotlinx.atomicfu.locks.ReentrantLock +import kotlinx.atomicfu.locks.withLock import platform.darwin.* @Suppress("UnusedReceiverParameter") @@ -21,6 +23,8 @@ public class DispatchQueueWorkRunner( private val dispatchQueue: dispatch_queue_t ) : WorkRunner { + private val lock = ReentrantLock() + init { check(Platform.memoryModel == MemoryModel.EXPERIMENTAL) { "Using DispatchQueueWorkRunner requires the experimental memory model.\nSee https://github.com/JetBrains/kotlin/blob/master/kotlin-native/NEW_MM.md" @@ -28,7 +32,9 @@ public class DispatchQueueWorkRunner( } override fun post(runnable: Runnable) { - dispatch_async(dispatchQueue, runnable::run) + lock.withLock { + dispatch_async(dispatchQueue, runnable::run) + } } override fun dispose(): Unit = Unit diff --git a/mobiuskt-core/src/jvmMain/kotlin/runners/ExecutorServiceWorkRunner.kt b/mobiuskt-core/src/jvmMain/kotlin/runners/ExecutorServiceWorkRunner.kt index 99459db3..c06c047a 100644 --- a/mobiuskt-core/src/jvmMain/kotlin/runners/ExecutorServiceWorkRunner.kt +++ b/mobiuskt-core/src/jvmMain/kotlin/runners/ExecutorServiceWorkRunner.kt @@ -1,21 +1,31 @@ package kt.mobius.runners +import kotlinx.atomicfu.locks.withLock import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock /** A [WorkRunner] implementation that is backed by an [ExecutorService]. */ public class ExecutorServiceWorkRunner(private val service: ExecutorService) : WorkRunner { + private val lock = ReentrantLock() + override fun post(runnable: Runnable) { - service.submit(runnable) + lock.withLock { + if (!service.isTerminated && !service.isShutdown) { + service.submit(runnable) + } + } } override fun dispose() { try { - val runnables = service.shutdownNow() + lock.withLock { + val runnables = service.shutdownNow() - if (runnables.isNotEmpty()) { - println("Disposing ExecutorServiceWorkRunner with ${runnables.size} outstanding tasks.") + if (runnables.isNotEmpty()) { + println("Disposing ExecutorServiceWorkRunner with ${runnables.size} outstanding tasks.") + } } if (!service.awaitTermination(100, TimeUnit.MILLISECONDS)) { diff --git a/mobiuskt-core/src/jvmTest/kotlin/ExecutorServiceWorkRunnerTest.kt b/mobiuskt-core/src/jvmTest/kotlin/ExecutorServiceWorkRunnerTest.kt index 6474f5c4..58908529 100644 --- a/mobiuskt-core/src/jvmTest/kotlin/ExecutorServiceWorkRunnerTest.kt +++ b/mobiuskt-core/src/jvmTest/kotlin/ExecutorServiceWorkRunnerTest.kt @@ -73,20 +73,24 @@ class ExecutorServiceWorkRunnerTest { } @Test - fun tasksShouldBeRejectedAfterDispose() { + fun tasksShouldBeSkippedAfterDispose() { + val output = CopyOnWriteArrayList() val service = Executors.newSingleThreadExecutor() underTest = ExecutorServiceWorkRunner(service) - underTest.dispose() - thrown.expect(RejectedExecutionException::class.java) + underTest.post { + output.add(1) + output.add(2) + } - underTest.post( - object : Runnable { - override fun run() { - println("ERROR: this shouldn't run/be printed!") - } - }) + underTest.dispose() + underTest.post { + output.add(3) + output.add(4) + } + + assertEquals(output, listOf(1, 2)) } @Test diff --git a/mobiuskt-core/src/jvmTest/kotlin/MobiusLoopDisposalBehaviorTest.kt b/mobiuskt-core/src/jvmTest/kotlin/MobiusLoopDisposalBehaviorTest.kt index 4f086b03..b5db24a8 100644 --- a/mobiuskt-core/src/jvmTest/kotlin/MobiusLoopDisposalBehaviorTest.kt +++ b/mobiuskt-core/src/jvmTest/kotlin/MobiusLoopDisposalBehaviorTest.kt @@ -8,6 +8,9 @@ import kt.mobius.Next.Companion.next import kt.mobius.Next.Companion.noChange import kt.mobius.disposables.Disposable import kt.mobius.functions.Consumer +import kt.mobius.functions.Producer +import kt.mobius.runners.WorkRunner +import kt.mobius.runners.WorkRunners import kt.mobius.test.RecordingConsumer import kt.mobius.test.RecordingModelObserver import kt.mobius.test.TestWorkRunner @@ -17,6 +20,7 @@ import kt.mobius.testdomain.TestEffect import kt.mobius.testdomain.TestEvent import java.util.* import java.util.concurrent.ExecutionException +import java.util.concurrent.Executors import java.util.concurrent.Semaphore import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.Nonnull @@ -223,6 +227,38 @@ class MobiusLoopDisposalBehavior : MobiusLoopTest() { } } + @Test + fun shouldSafelyDisposeWhenDisposeAndEventsAreOnDifferentThreads() { + val random = Random() + val builder: MobiusLoop.Builder = loop(update, effectHandler) + .eventRunner { WorkRunners.from(Executors.newFixedThreadPool(4)) } + val thread = Thread { + for (i in 0..99) { + mobiusLoop = builder.startFrom("foo") + try { + Thread.sleep(random.nextInt(10).toLong()) + } catch (e: InterruptedException) { + throw java.lang.RuntimeException(e) + } + mobiusLoop.dispose() + } + } + thread.start() + for (i in 0..999) { + try { + mobiusLoop.dispatchEvent(TestEvent("bar")) + Thread.sleep(1) + } catch (e: java.lang.IllegalStateException) { + if (e.message != null) { + assertFalse(e.message!!.startsWith("Exception processing event")) + } + } catch (e: InterruptedException) { + throw RuntimeException(e) + } + } + thread.join() + } + internal class EmitDuringDisposeEventSource(private val event: TestEvent) : EventSource { @Nonnull override fun subscribe(eventConsumer: Consumer): Disposable { diff --git a/mobiuskt-core/src/jvmTest/kotlin/MobiusLoopErrorReporting.kt b/mobiuskt-core/src/jvmTest/kotlin/MobiusLoopErrorReporting.kt deleted file mode 100644 index 564d12b9..00000000 --- a/mobiuskt-core/src/jvmTest/kotlin/MobiusLoopErrorReporting.kt +++ /dev/null @@ -1,43 +0,0 @@ -package kt.mobius - -import kt.mobius.MobiusLoop.Companion.create -import kt.mobius.runners.ExecutorServiceWorkRunner -import kt.mobius.runners.WorkRunner -import kt.mobius.test.RecordingModelObserver -import kt.mobius.testdomain.TestEvent -import java.util.concurrent.Executors -import kotlin.test.Test -import kotlin.test.assertFails -import kotlin.test.assertTrue - - -class MobiusLoopErrorReporting : MobiusLoopTest() { - - @Test - fun shouldIncludeEventInExceptionWhenDispatchFails() { - // given a loop - observer = RecordingModelObserver() - val executorService = Executors.newSingleThreadExecutor() - val eventRunner: WorkRunner = ExecutorServiceWorkRunner(executorService) - mobiusLoop = create( - update, - startModel, - startEffects, - effectHandler, - eventSource, - eventRunner, - immediateRunner - ) - - // whose event workrunner has been disposed. - eventRunner.dispose() - - // when an event is dispatched, - // then the exception contains a description of the event. - val error = assertFails { - mobiusLoop.dispatchEvent(TestEvent("print me in the exception message")) - } - - assertTrue(error.message?.contains("print me in the exception message") ?: false) - } -} \ No newline at end of file diff --git a/mobiuskt-core/src/nativeMain/kotlin/runners/NativeWorkRunner.kt b/mobiuskt-core/src/nativeMain/kotlin/runners/NativeWorkRunner.kt index cf4fbaaa..2055a226 100644 --- a/mobiuskt-core/src/nativeMain/kotlin/runners/NativeWorkRunner.kt +++ b/mobiuskt-core/src/nativeMain/kotlin/runners/NativeWorkRunner.kt @@ -1,11 +1,15 @@ package kt.mobius.runners +import kotlinx.atomicfu.locks.SynchronizedObject +import kotlinx.atomicfu.locks.synchronized import kotlin.native.concurrent.Worker public class NativeWorkRunner( private val worker: Worker ) : WorkRunner { + private val lock = SynchronizedObject() + init { check(Platform.memoryModel == MemoryModel.EXPERIMENTAL) { "Using NativeWorkRunner requires the experimental memory model.\nSee https://github.com/JetBrains/kotlin/blob/master/kotlin-native/NEW_MM.md" @@ -13,10 +17,14 @@ public class NativeWorkRunner( } override fun post(runnable: Runnable) { - worker.executeAfter(operation = runnable::run) + synchronized(lock) { + worker.executeAfter(operation = runnable::run) + } } override fun dispose() { - worker.requestTermination(processScheduledJobs = false) + synchronized(lock) { + worker.requestTermination(processScheduledJobs = false) + } } } diff --git a/mobiuskt-coroutines/build.gradle.kts b/mobiuskt-coroutines/build.gradle.kts index d0dcea58..3f54f079 100644 --- a/mobiuskt-coroutines/build.gradle.kts +++ b/mobiuskt-coroutines/build.gradle.kts @@ -3,6 +3,7 @@ plugins { id("org.jetbrains.kotlinx.binary-compatibility-validator") } +apply(plugin = "kotlinx-atomicfu") apply(from = "../gradle/publishing.gradle.kts") kotlin { @@ -60,13 +61,13 @@ kotlin { dependencies { implementation(projects.mobiusktCore) implementation(libs.coroutines.core) + implementation(libs.atomicfu) } } named("commonTest") { dependencies { implementation(projects.mobiusktTest) - implementation(libs.atomicfu) implementation(libs.coroutines.test) implementation(kotlin("test-common")) implementation(kotlin("test-annotations-common")) diff --git a/mobiuskt-coroutines/src/commonMain/kotlin/DispatcherWorkRunner.kt b/mobiuskt-coroutines/src/commonMain/kotlin/DispatcherWorkRunner.kt index 88480cfc..61b0b799 100644 --- a/mobiuskt-coroutines/src/commonMain/kotlin/DispatcherWorkRunner.kt +++ b/mobiuskt-coroutines/src/commonMain/kotlin/DispatcherWorkRunner.kt @@ -1,5 +1,7 @@ package kt.mobius.flow +import kotlinx.atomicfu.locks.SynchronizedObject +import kotlinx.atomicfu.locks.synchronized import kt.mobius.runners.WorkRunner import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope @@ -9,7 +11,7 @@ import kotlinx.coroutines.launch import kt.mobius.runners.Runnable import kt.mobius.runners.WorkRunners -@Suppress("unused") +@Suppress("UnusedReceiverParameter") public fun WorkRunners.fromDispatcher(dispatcher: CoroutineDispatcher): WorkRunner { return DispatcherWorkRunner(dispatcher) } @@ -19,13 +21,18 @@ public class DispatcherWorkRunner( dispatcher: CoroutineDispatcher ) : WorkRunner { + private val lock = SynchronizedObject() private val scope = CoroutineScope(dispatcher + SupervisorJob()) override fun post(runnable: Runnable) { - scope.launch { runnable.run() } + synchronized(lock) { + scope.launch { runnable.run() } + } } override fun dispose() { - scope.cancel() + synchronized(lock) { + scope.cancel() + } } }