Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the implementation of flowFromSuspend and flowFromNonSuspend #203

Merged
merged 9 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change Log

## [Unreleased] - TBD

### Changed

- Optimize the implementation of `flowFromSuspend` and `flowFromNonSuspend`,
it is just an internal change, it does not affect the public API and behavior.

## [0.7.4] - Nov 12, 2023

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package com.hoc081098.flowext

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow

/**
* Creates a _cold_ flow that produces a single value from the given [function].
Expand Down Expand Up @@ -66,4 +65,11 @@ import kotlinx.coroutines.flow.flow
*
* @see flowFromSuspend
*/
public fun <T> flowFromNonSuspend(function: () -> T): Flow<T> = flow { return@flow emit(function()) }
public fun <T> flowFromNonSuspend(function: () -> T): Flow<T> =
FlowFromNonSuspend(function)

// We don't need to use `AbstractFlow` here because we only emit a single value without a context switch,
// and we guarantee all Flow's constraints: context preservation and exception transparency.
private class FlowFromNonSuspend<T>(private val function: () -> T) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) = collector.emit(function())
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package com.hoc081098.flowext

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow

/**
* Creates a _cold_ flow that produces a single value from the given [function].
Expand Down Expand Up @@ -71,4 +70,10 @@ import kotlinx.coroutines.flow.flow
* @see flowFromNonSuspend
*/
public fun <T> flowFromSuspend(function: suspend () -> T): Flow<T> =
flow { return@flow emit(function()) }
FlowFromSuspend(function)

// We don't need to use `AbstractFlow` here because we only emit a single value without a context switch,
// and we guarantee all Flow's constraints: context preservation and exception transparency.
private class FlowFromSuspend<T>(private val function: suspend () -> T) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) = collector.emit(function())
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
package com.hoc081098.flowext.internal

import com.hoc081098.flowext.utils.Symbol
import kotlin.jvm.JvmField

/*
* Symbol used to indicate that the flow is complete.
* It should never leak to the outside world.
*/
@JvmField
internal val DONE_VALUE = Symbol("DONE_VALUE")
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@
package com.hoc081098.flowext

import com.hoc081098.flowext.utils.BaseTest
import com.hoc081098.flowext.utils.NamedDispatchers
import com.hoc081098.flowext.utils.TestException
import com.hoc081098.flowext.utils.assertFailsWith
import com.hoc081098.flowext.utils.test
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.take

@ExperimentalCoroutinesApi
class FlowFromNonSuspendTest : BaseTest() {
Expand All @@ -49,9 +55,38 @@ class FlowFromNonSuspendTest : BaseTest() {
}

@Test
fun deferFactoryThrows() = runTest {
fun flowFromNonSuspendFunctionThrows() = runTest {
val testException = TestException()

flowFromNonSuspend<Int> { throw testException }.test(listOf(Event.Error(testException)))
}

@Test
fun flowFromNonSuspendCancellation() = runTest {
fun throwsCancellationException(): Unit = throw CancellationException("Flow was cancelled")
assertFailsWith<CancellationException>(
flowFromNonSuspend {
val i = 1 + 2
throwsCancellationException()
i + 3
},
)
}

@Test
fun flowFromNonSuspenddTake() = runTest {
flowFromNonSuspend { 100 }
.take(1)
.test(listOf(Event.Value(100), Event.Complete))
}

@Test
fun testContextPreservation1() = runTest {
val flow = flowFromNonSuspend {
assertEquals("OK", NamedDispatchers.name())
42
}.flowOn(NamedDispatchers("OK"))

flow.test(listOf(Event.Value(42), Event.Complete))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@
package com.hoc081098.flowext

import com.hoc081098.flowext.utils.BaseTest
import com.hoc081098.flowext.utils.NamedDispatchers
import com.hoc081098.flowext.utils.TestException
import com.hoc081098.flowext.utils.assertFailsWith
import com.hoc081098.flowext.utils.test
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.withContext

@ExperimentalCoroutinesApi
class FlowFromSuspendTest : BaseTest() {
Expand All @@ -51,9 +59,64 @@ class FlowFromSuspendTest : BaseTest() {
}

@Test
fun deferFactoryThrows() = runTest {
fun flowFromSuspendFunctionThrows() = runTest {
val testException = TestException()

flowFromSuspend<Int> { throw testException }.test(listOf(Event.Error(testException)))
}

@Test
fun flowFromSuspendCancellation() = runTest {
fun throwsCancellationException(): Unit = throw CancellationException("Flow was cancelled")
assertFailsWith<CancellationException>(
flowFromSuspend {
val i = 1 + 2
throwsCancellationException()
i + 3
},
)
}

@Test
fun flowFromSuspendTake() = runTest {
flowFromSuspend { 100 }.take(1)
.test(listOf(Event.Value(100), Event.Complete))

flowFromSuspend {
delay(100)
100
}
.take(1)
.test(listOf(Event.Value(100), Event.Complete))
}

@Test
fun testContextPreservation1() = runTest {
val flow = flowFromSuspend {
assertEquals("OK", NamedDispatchers.name())

withContext(Dispatchers.Default) { delay(100) }

assertEquals("OK", NamedDispatchers.name())
42
}.flowOn(NamedDispatchers("OK"))

flow.test(listOf(Event.Value(42), Event.Complete))
}

@Test
fun testContextPreservation2() = runTest {
val flow = flowFromSuspend {
assertEquals("OK", NamedDispatchers.name())

withContext(Dispatchers.Default) {
delay(100)
42
}.also {
assertEquals("OK", NamedDispatchers.name())
}
}.flowOn(NamedDispatchers("OK"))

flow.test(listOf(Event.Value(42), Event.Complete))
}
}