Skip to content

Commit

Permalink
optimize the implementation of flowFromSuspend and `flowFromNonSusp…
Browse files Browse the repository at this point in the history
…end` (#203)

* add flowFromSuspend tests

* done for FlowFromSuspend

* done for FlowFromSuspend

* done for FlowFromSuspend

* done for FlowFromSuspend

* test FlowFromNonSuspendTest

* FlowFromNonSuspend

* changelog

* DONE_VALUE jvmFIeld
  • Loading branch information
hoc081098 authored Nov 18, 2023
1 parent 9d5122c commit 47ebb1a
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change Log

## [Unreleased] - TBD

### Changed

- Optimize the implementation of `flowFromSuspend` and `flowFromNonSuspend`,
it is just an internal change, it does not affect the public API and behavior.

## [0.7.4] - Nov 12, 2023

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package com.hoc081098.flowext

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow

/**
* Creates a _cold_ flow that produces a single value from the given [function].
Expand Down Expand Up @@ -66,4 +65,11 @@ import kotlinx.coroutines.flow.flow
*
* @see flowFromSuspend
*/
public fun <T> flowFromNonSuspend(function: () -> T): Flow<T> = flow { return@flow emit(function()) }
public fun <T> flowFromNonSuspend(function: () -> T): Flow<T> =
FlowFromNonSuspend(function)

// We don't need to use `AbstractFlow` here because we only emit a single value without a context switch,
// and we guarantee all Flow's constraints: context preservation and exception transparency.
private class FlowFromNonSuspend<T>(private val function: () -> T) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) = collector.emit(function())
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package com.hoc081098.flowext

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow

/**
* Creates a _cold_ flow that produces a single value from the given [function].
Expand Down Expand Up @@ -71,4 +70,10 @@ import kotlinx.coroutines.flow.flow
* @see flowFromNonSuspend
*/
public fun <T> flowFromSuspend(function: suspend () -> T): Flow<T> =
flow { return@flow emit(function()) }
FlowFromSuspend(function)

// We don't need to use `AbstractFlow` here because we only emit a single value without a context switch,
// and we guarantee all Flow's constraints: context preservation and exception transparency.
private class FlowFromSuspend<T>(private val function: suspend () -> T) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) = collector.emit(function())
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
package com.hoc081098.flowext.internal

import com.hoc081098.flowext.utils.Symbol
import kotlin.jvm.JvmField

/*
* Symbol used to indicate that the flow is complete.
* It should never leak to the outside world.
*/
@JvmField
internal val DONE_VALUE = Symbol("DONE_VALUE")
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@
package com.hoc081098.flowext

import com.hoc081098.flowext.utils.BaseTest
import com.hoc081098.flowext.utils.NamedDispatchers
import com.hoc081098.flowext.utils.TestException
import com.hoc081098.flowext.utils.assertFailsWith
import com.hoc081098.flowext.utils.test
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.take

@ExperimentalCoroutinesApi
class FlowFromNonSuspendTest : BaseTest() {
Expand All @@ -49,9 +55,38 @@ class FlowFromNonSuspendTest : BaseTest() {
}

@Test
fun deferFactoryThrows() = runTest {
fun flowFromNonSuspendFunctionThrows() = runTest {
val testException = TestException()

flowFromNonSuspend<Int> { throw testException }.test(listOf(Event.Error(testException)))
}

@Test
fun flowFromNonSuspendCancellation() = runTest {
fun throwsCancellationException(): Unit = throw CancellationException("Flow was cancelled")
assertFailsWith<CancellationException>(
flowFromNonSuspend {
val i = 1 + 2
throwsCancellationException()
i + 3
},
)
}

@Test
fun flowFromNonSuspenddTake() = runTest {
flowFromNonSuspend { 100 }
.take(1)
.test(listOf(Event.Value(100), Event.Complete))
}

@Test
fun testContextPreservation1() = runTest {
val flow = flowFromNonSuspend {
assertEquals("OK", NamedDispatchers.name())
42
}.flowOn(NamedDispatchers("OK"))

flow.test(listOf(Event.Value(42), Event.Complete))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@
package com.hoc081098.flowext

import com.hoc081098.flowext.utils.BaseTest
import com.hoc081098.flowext.utils.NamedDispatchers
import com.hoc081098.flowext.utils.TestException
import com.hoc081098.flowext.utils.assertFailsWith
import com.hoc081098.flowext.utils.test
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.withContext

@ExperimentalCoroutinesApi
class FlowFromSuspendTest : BaseTest() {
Expand All @@ -51,9 +59,64 @@ class FlowFromSuspendTest : BaseTest() {
}

@Test
fun deferFactoryThrows() = runTest {
fun flowFromSuspendFunctionThrows() = runTest {
val testException = TestException()

flowFromSuspend<Int> { throw testException }.test(listOf(Event.Error(testException)))
}

@Test
fun flowFromSuspendCancellation() = runTest {
fun throwsCancellationException(): Unit = throw CancellationException("Flow was cancelled")
assertFailsWith<CancellationException>(
flowFromSuspend {
val i = 1 + 2
throwsCancellationException()
i + 3
},
)
}

@Test
fun flowFromSuspendTake() = runTest {
flowFromSuspend { 100 }.take(1)
.test(listOf(Event.Value(100), Event.Complete))

flowFromSuspend {
delay(100)
100
}
.take(1)
.test(listOf(Event.Value(100), Event.Complete))
}

@Test
fun testContextPreservation1() = runTest {
val flow = flowFromSuspend {
assertEquals("OK", NamedDispatchers.name())

withContext(Dispatchers.Default) { delay(100) }

assertEquals("OK", NamedDispatchers.name())
42
}.flowOn(NamedDispatchers("OK"))

flow.test(listOf(Event.Value(42), Event.Complete))
}

@Test
fun testContextPreservation2() = runTest {
val flow = flowFromSuspend {
assertEquals("OK", NamedDispatchers.name())

withContext(Dispatchers.Default) {
delay(100)
42
}.also {
assertEquals("OK", NamedDispatchers.name())
}
}.flowOn(NamedDispatchers("OK"))

flow.test(listOf(Event.Value(42), Event.Complete))
}
}

0 comments on commit 47ebb1a

Please sign in to comment.