diff --git a/api/FlowExt.api b/api/FlowExt.api index e77a0e93..ac2c7dd0 100644 --- a/api/FlowExt.api +++ b/api/FlowExt.api @@ -203,6 +203,18 @@ public final class com/hoc081098/flowext/PairwiseKt { public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } +public abstract interface annotation class com/hoc081098/flowext/PublishSelectorDsl : java/lang/annotation/Annotation { +} + +public abstract interface annotation class com/hoc081098/flowext/PublishSelectorSharedFlowDsl : java/lang/annotation/Annotation { +} + +public final class com/hoc081098/flowext/PublishWithSelectorKt { + public static final fun main (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun main ([Ljava/lang/String;)V + public static final fun publish (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; +} + public final class com/hoc081098/flowext/RaceKt { public static final fun amb (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow; public static final fun amb (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; @@ -243,6 +255,21 @@ public final class com/hoc081098/flowext/ScanWithKt { public static final fun scanWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } +public abstract interface class com/hoc081098/flowext/SelectorScope { + public abstract fun select (Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; +} + +public abstract interface class com/hoc081098/flowext/SelectorSharedFlowScope { + public abstract fun shareIn (Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/SharedFlow; + public abstract fun shareIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow; +} + +public final class com/hoc081098/flowext/SelectorSharedFlowScope$DefaultImpls { + public static fun shareIn (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow; + public static synthetic fun shareIn$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow; + public static synthetic fun shareIn$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow; +} + public final class com/hoc081098/flowext/SelectorsKt { public static final fun select (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun select (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow; diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt index 3d340cec..421935da 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt @@ -29,3 +29,50 @@ internal expect class AtomicRef(initialValue: T) { fun compareAndSet(expect: T, update: T): Boolean } + +// Copy from: https://github.com/arrow-kt/arrow/blob/6fb6a75b131f5bbb272611bf277e263ff791cb67/arrow-libs/core/arrow-atomic/src/commonMain/kotlin/arrow/atomic/Atomic.kt#L44 + +/** + * Infinite loop that reads this atomic variable and performs the specified [action] on its value. + */ +internal inline fun AtomicRef.loop(action: (T) -> Unit): Nothing { + while (true) { + action(value) + } +} + +internal fun AtomicRef.tryUpdate(function: (T) -> T): Boolean { + val cur = value + val upd = function(cur) + return compareAndSet(cur, upd) +} + +internal inline fun AtomicRef.update(function: (T) -> T) { + while (true) { + val cur = value + val upd = function(cur) + if (compareAndSet(cur, upd)) return + } +} + +/** + * Updates variable atomically using the specified [function] of its value and returns its old value. + */ +internal inline fun AtomicRef.getAndUpdate(function: (T) -> T): T { + while (true) { + val cur = value + val upd = function(cur) + if (compareAndSet(cur, upd)) return cur + } +} + +/** + * Updates variable atomically using the specified [function] of its value and returns its new value. + */ +internal inline fun AtomicRef.updateAndGet(function: (T) -> T): T { + while (true) { + val cur = value + val upd = function(cur) + if (compareAndSet(cur, upd)) return upd + } +} diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt new file mode 100644 index 00000000..92dcdbd3 --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt @@ -0,0 +1,61 @@ +/* + * MIT License + * + * Copyright (c) 2021-2023 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:Suppress("ktlint:standard:property-naming") + +package com.hoc081098.flowext.internal + +import kotlin.concurrent.Volatile +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.internal.SynchronizedObject +import kotlinx.coroutines.internal.synchronized + +// TODO: Remove SynchronizedObject +@OptIn(InternalCoroutinesApi::class) +internal class SimpleLazy( + initializer: () -> T, +) : SynchronizedObject() { + private var _initializer: (() -> T)? = initializer + + @Volatile + private var value: T? = null + + fun getValue(): T = + value ?: synchronized(this) { + value ?: _initializer!!().also { + _initializer = null + value = it + } + } + + fun getOrNull(): T? = value + + fun clear() { + _initializer = null + value = null + } +} + +internal fun simpleLazyOf(initializer: () -> T): SimpleLazy = + SimpleLazy(initializer) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleSuspendLazy.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleSuspendLazy.kt new file mode 100644 index 00000000..9a6ae23b --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleSuspendLazy.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2021-2023 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:Suppress("ktlint:standard:property-naming") + +package com.hoc081098.flowext.internal + +import kotlin.concurrent.Volatile +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +internal class SimpleSuspendLazy( + initializer: suspend () -> T, +) { + private val mutex = Mutex() + + @Volatile + private var _initializer: (suspend () -> T)? = initializer + + @Volatile + private var value: T? = null + + suspend fun getValue(): T = + value ?: mutex.withLock { + value ?: _initializer!!().also { + _initializer = null + value = it + } + } + + fun clear() { + _initializer = null + value = null + } +} + +internal fun simpleSuspendLazy(initializer: suspend () -> T): SimpleSuspendLazy = + SimpleSuspendLazy(initializer) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt new file mode 100644 index 00000000..722db892 --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -0,0 +1,493 @@ +/* + * MIT License + * + * Copyright (c) 2021-2023 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.internal.AtomicRef +import com.hoc081098.flowext.internal.SimpleLazy +import com.hoc081098.flowext.internal.SimpleSuspendLazy +import com.hoc081098.flowext.internal.loop +import com.hoc081098.flowext.internal.simpleLazyOf +import com.hoc081098.flowext.internal.simpleSuspendLazy +import kotlin.jvm.JvmField +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.flatMapMerge +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.shareIn as kotlinXFlowShareIn +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch + +@FlowExtPreview +@DslMarker +public annotation class PublishSelectorDsl + +@FlowExtPreview +@DslMarker +public annotation class PublishSelectorSharedFlowDsl + +@FlowExtPreview +@PublishSelectorSharedFlowDsl +public sealed interface SelectorSharedFlowScope { + @PublishSelectorSharedFlowDsl + public fun Flow.shareIn( + replay: Int = 0, + started: SharingStarted = SharingStarted.Lazily, + ): SharedFlow + + /** @suppress */ + @Deprecated( + level = DeprecationLevel.ERROR, + message = "This function is not supported", + replaceWith = ReplaceWith("this.shareIn(replay, started)"), + ) + public fun Flow.shareIn( + scope: CoroutineScope, + started: SharingStarted, + replay: Int = 0, + ): SharedFlow = throw UnsupportedOperationException("Not implemented, should not be called") +} + +@FlowExtPreview +private typealias SelectorFunction = suspend SelectorSharedFlowScope.(Flow) -> Flow + +@FlowExtPreview +@PublishSelectorDsl +public sealed interface SelectorScope { + @PublishSelectorDsl + public fun select(block: SelectorFunction): Flow +} + +private typealias SimpleSuspendLazyOfAnyFlow = SimpleSuspendLazy> + +@FlowExtPreview +private sealed interface SelectorScopeState { + data object Init : SelectorScopeState + + sealed interface NotFrozen : SelectorScopeState { + val blocks: List> + + data class InSelectClause( + override val blocks: List>, + ) : NotFrozen + + data class NotInSelectClause( + override val blocks: List>, + ) : NotFrozen + } + + data class Frozen( + val selectedFlowsAndChannels: SimpleLazy< + Pair< + List, + List>, + >, + >, + val completedCount: Int, + val blocks: List>, + ) : SelectorScopeState + + data object Closed : SelectorScopeState +} + +@FlowExtPreview +@OptIn(DelicateCoroutinesApi::class) +private class DefaultSelectorScope( + @JvmField val scope: CoroutineScope, +) : SelectorScope, + SelectorSharedFlowScope { + @JvmField + val stateRef = AtomicRef>(SelectorScopeState.Init) + + override fun select(block: SelectorFunction): Flow { + println("call select with block: $block") + + stateRef.loop { state -> + val updated = when (state) { + SelectorScopeState.Closed -> { + error("This scope is closed") + } + + is SelectorScopeState.Frozen -> { + error("This scope is frozen. `select` only can be called inside `publish`, do not use `SelectorScope` outside `publish`") + } + + SelectorScopeState.Init -> { + // Ok, lets transition to NotFrozen.InSelectClause + SelectorScopeState.NotFrozen.InSelectClause(blocks = listOf(block)) + } + + is SelectorScopeState.NotFrozen.InSelectClause -> { + error("`select` can not be called inside another `select`") + } + + is SelectorScopeState.NotFrozen.NotInSelectClause -> { + // Ok, lets transition to NotFrozen.InSelectClause + SelectorScopeState.NotFrozen.InSelectClause(blocks = state.blocks + block) + } + } + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + val index = updated.blocks.size - 1 + + val result = defer { + // Only frozen state can reach here, + // that means we collect the output flow after frozen this scope + val stateWhenCollecting = stateRef.value + check(stateWhenCollecting is SelectorScopeState.Frozen) { "only frozen state can reach here!" } + + @Suppress("UNCHECKED_CAST") // We know that the type is correct + stateWhenCollecting + .selectedFlowsAndChannels + .getValue() + .first[index] + .getValue() + as Flow + }.onCompletion { onCompleteSelectedFlow(index) } + + transitionToNotInSelectClause() + + return result + } + } + } + + private fun transitionToNotInSelectClause() { + stateRef.loop { state -> + val updated = when (state) { + is SelectorScopeState.NotFrozen.InSelectClause -> { + // Ok, lets transition to NotFrozen.NotInSelectClause + SelectorScopeState.NotFrozen.NotInSelectClause(blocks = state.blocks) + } + + is SelectorScopeState.NotFrozen.NotInSelectClause -> { + // Ok, state already is NotFrozen.NotInSelectClause + return + } + + SelectorScopeState.Closed -> { + error("This scope is closed") + } + + is SelectorScopeState.Frozen -> { + error("This scope is frozen. `select` only can be called inside `publish`, do not use `SelectorScope` outside `publish`") + } + + SelectorScopeState.Init -> { + error("Cannot be here!") + } + } + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + return + } + } + } + + private fun onCompleteSelectedFlow(index: Int) { + stateRef.loop { state -> + val updated = when (state) { + SelectorScopeState.Init -> { + error("Cannot be here!") + } + + is SelectorScopeState.NotFrozen.InSelectClause -> { + error("Cannot be here!") + } + + is SelectorScopeState.NotFrozen.NotInSelectClause -> { + error("Cannot be here!") + } + + is SelectorScopeState.Frozen -> { + if (state.completedCount == state.blocks.size) { + // Ok, all output flows are completed. Lets transition to DefaultSelectorScopeState.Closed + SelectorScopeState.Closed + } else { + // Ok, lets transition to DefaultSelectorScopeState.Frozen with completedCount=completedCount + 1 + state.copy(completedCount = state.completedCount + 1) + } + } + + SelectorScopeState.Closed -> { + // Ok, already closed. Do nothing. + return + } + } + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + + println( + "onCompleteSelectedFlow: completedCount = ${(updated as? SelectorScopeState.Frozen)?.completedCount}, " + + "size = ${state.blocks.size}, " + + "(index = $index)", + ) + + // Once state reaches DefaultSelectorScopeState.Closed, we can clear unused lazy + if (updated is SelectorScopeState.Closed) { + state.selectedFlowsAndChannels.run { + getOrNull()?.first?.forEach { it.clear() } + clear() + } + + println("onCompleteSelectedFlow: cancel the publish scope") + } + + return + } + } + } + + override fun Flow.shareIn(replay: Int, started: SharingStarted): SharedFlow = + kotlinXFlowShareIn( + scope = scope, + started = started, + replay = replay, + ) + + fun freezeAndInit() { + stateRef.loop { state -> + // Transition from NotFrozen to Frozen + when (state) { + SelectorScopeState.Init -> { + error("Not implemented") + } + + is SelectorScopeState.NotFrozen -> { + // Freeze and init + } + + is SelectorScopeState.Frozen, SelectorScopeState.Closed -> { + // Already frozen or closed + return + } + } + + val blocks = state.blocks + val size = blocks.size + val updated = SelectorScopeState.Frozen( + selectedFlowsAndChannels = simpleLazyOf { + val channels = List(size) { Channel() } + + List(size) { index -> + val block = blocks[index] + val flow = channels[index].consumeAsFlow() + simpleSuspendLazy { this.block(flow) } + } to channels + }, + completedCount = 0, + blocks = blocks, + ) + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + return + } + + // clear unused lazy + updated.selectedFlowsAndChannels.clear() + } + } + + suspend fun send(value: T) { + println("send: $value") + + val state = stateRef.value as? SelectorScopeState.Frozen ?: return + val channels = state + .selectedFlowsAndChannels + .getValue() + .second + + for (channel in channels) { + if (channel.isClosedForSend || channel.isClosedForReceive) { + continue + } + + try { + channel.send(value) + } catch (_: Throwable) { + // Swallow all exceptions + } + } + } + + private fun transitionToClosed(action: (Channel) -> Unit) { + stateRef.loop { state -> + val updated = when (state) { + SelectorScopeState.Init -> { + error("Cannot be here!") + } + + is SelectorScopeState.NotFrozen.InSelectClause -> { + error("Cannot be here!") + } + + is SelectorScopeState.NotFrozen.NotInSelectClause -> { + error("Cannot be here!") + } + + is SelectorScopeState.Frozen -> { + // Ok, lets transition to DefaultSelectorScopeState.Closed + SelectorScopeState.Closed + } + + SelectorScopeState.Closed -> { + // Ok, already closed. Do nothing. + return + } + } + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + + // Once state reaches DefaultSelectorScopeState.Closed, we can clear unused lazy and close all channels + state.selectedFlowsAndChannels.run { + getOrNull()?.first?.forEach { it.clear() } + getOrNull()?.second?.forEach(action) + clear() + } + + return + } + } + } + + fun close(e: Throwable?) { + println("close: $e") + transitionToClosed { it.close(e) } + } + + fun cancel(e: CancellationException) { + println("cancel: $e") + transitionToClosed { it.cancel(e) } + } +} + +@FlowExtPreview +public fun Flow.publish(selector: suspend SelectorScope.() -> Flow): Flow { + val source = this + + return flow { + coroutineScope { + val scope = DefaultSelectorScope(this) + + val output = selector(scope) + + // IMPORTANT: freeze and init before collect the output flow + scope.freezeAndInit() + + launch { + try { + source.collect { value -> return@collect scope.send(value) } + scope.close(null) + } catch (e: CancellationException) { + scope.cancel(e) + throw e + } catch (e: Throwable) { + scope.close(e) + throw e + } + } + + // IMPORTANT: collect the output flow after frozen the scope + emitAll(output) + } + } +} + +@OptIn(FlowExtPreview::class, ExperimentalCoroutinesApi::class) +public suspend fun main() { + flow { + println("Collect...") + delay(100) + emit(1) + delay(100) + emit(2) + delay(100) + emit(3) + delay(100) + emit("4") + }.onEach { println(">>> onEach: $it") } + .publish { + delay(100) + + merge( + select { flow -> + delay(1) + val sharedFlow = flow.shareIn() + + interval(0, 100) + .onEach { println(">>> interval: $it") } + .flatMapMerge { value -> + timer(value, 50) + .withLatestFrom(sharedFlow) + .map { it to "shared" } + }.takeUntil(sharedFlow.filter { it == 3 }) + }, + select { flow -> + flow + .filterIsInstance() + .filter { it % 2 == 0 } + .map { it to "even" } + .take(1) + }, + select { flow -> + flow + .filterIsInstance() + .filter { it % 2 != 0 } + .map { it to "odd" } + .take(1) + }, + select { flow -> + flow + .filterIsInstance() + .map { it to "string" } + .take(1) + }, + ) + } + .toList() + .also { println(it) } + .let { check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) } +}