diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 4c6280eb..aae8b7c8 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -9,4 +9,4 @@ - \ No newline at end of file + diff --git a/CHANGELOG.md b/CHANGELOG.md index 49b1fa18..24f2f294 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ ### Added - Add `Flow.chunked` operator, it is an alias to `Flow.bufferCount` operator. +- Add `Flow.pairwise(transform)` operator - a variant of `Flow.pairwise()` operator, + which allows the transformation of the pair of values via the `transform` lambda parameter. + +- Add `Flow.zipWithNext()` operator, it is an alias to `Flow.pairwise()` operator. +- Add `Flow.zipWithNext(transform)` operator, it is an alias to `Flow.pairwise(transform)` operator. ## [0.7.2] - Oct 7, 2023 diff --git a/README.md b/README.md index fa61c79a..a28d41bb 100644 --- a/README.md +++ b/README.md @@ -25,8 +25,11 @@ > Kotlinx Coroutines Flow Extensions. Extensions to the Kotlin Flow library. Kotlin Flow extensions. > Multiplatform Kotlinx Coroutines Flow Extensions. Multiplatform Extensions to the Kotlin Flow > library. Multiplatform Kotlin Flow extensions. RxJS Kotlin Coroutines Flow. RxSwift Kotlin -> Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava Kotlin -> Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow. +> Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava +> Kotlin +> Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow. Kotlin Flow +> operators. +> Coroutines Flow operators. ## Author: [Petrus Nguyễn Thái Học](https://github.com/hoc081098) @@ -145,7 +148,7 @@ dependencies { - [`dematerialize`](#dematerialize) - [`raceWith`](#racewith--ambwith) - [`ambWith`](#racewith--ambwith) - - [`pairwise`](#pairwise) + - [`pairwise`](#pairwise--zipWithNext) - [`repeat`](#repeat) - [`retryWhenWithDelayStrategy`](#retrywhenwithdelaystrategy) - [`retryWhenWithExponentialBackoff`](#retrywhenwithexponentialbackoff) @@ -157,6 +160,7 @@ dependencies { - [`takeUntil`](#takeuntil) - [`throttleTime`](#throttletime) - [`withLatestFrom`](#withlatestfrom) + - [`zipWithNext`](#pairwise--zipWithNext) #### bufferCount / chunked @@ -871,18 +875,25 @@ raceWith: 3 ---- -#### pairwise +#### pairwise / zipWithNext - Similar to [RxJS pairwise](https://rxjs.dev/api/operators/pairwise) Groups pairs of consecutive emissions together and emits them as a pair. Emits the `(n)th` and `(n-1)th` events as a pair. The first value won't be emitted until the second one arrives. +Note, `zipWithNext` is an alias to `pairwise`. ```kotlin range(0, 4) .pairwise() .collect { println("pairwise: $it") } + +println("---") + +range(0, 4) + .zipWithNext { a, b -> "$a -> $b" } + .collect { println("zipWithNext: $it") } ``` Output: @@ -891,6 +902,10 @@ Output: pairwise: (0, 1) pairwise: (1, 2) pairwise: (2, 3) +--- +zipWithNext: 0 -> 1 +zipWithNext: 1 -> 2 +zipWithNext: 2 -> 3 ``` ---- diff --git a/api/FlowExt.api b/api/FlowExt.api index 06efabd3..8289218c 100644 --- a/api/FlowExt.api +++ b/api/FlowExt.api @@ -68,6 +68,9 @@ public final class com/hoc081098/flowext/DelayStrategy$NoDelayStrategy : com/hoc public fun nextDelay-3nIYWDw (Ljava/lang/Throwable;J)J } +public abstract interface annotation class com/hoc081098/flowext/DelicateFlowExtApi : java/lang/annotation/Annotation { +} + public final class com/hoc081098/flowext/EagerKt { public static final fun flatMapConcatEager (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun flatMapConcatEager$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; @@ -183,6 +186,9 @@ public final class com/hoc081098/flowext/NeverFlowKt { public final class com/hoc081098/flowext/PairwiseKt { public static final fun pairwise (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun pairwise (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } public final class com/hoc081098/flowext/RaceKt { diff --git a/build.gradle.kts b/build.gradle.kts index 543a228a..aac4c769 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -31,6 +31,14 @@ repositories { kotlin { explicitApi() + sourceSets { + all { + languageSettings { + optIn("com.hoc081098.flowext.DelicateFlowExtApi") + } + } + } + jvmToolchain { languageVersion.set(JavaLanguageVersion.of(17)) vendor.set(JvmVendorSpec.AZUL) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/DelicateFlowExtApi.kt b/src/commonMain/kotlin/com/hoc081098/flowext/DelicateFlowExtApi.kt new file mode 100644 index 00000000..c933ff88 --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/DelicateFlowExtApi.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2021-2023 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +/** + * Marks declarations in the `FlowExt` that are **delicate** — + * they have limited use-case and shall be used with care in general code. + * Any use of a delicate declaration has to be carefully reviewed to make sure it is + * properly used and does not create problems like memory and resource leaks. + * Carefully read documentation of any declaration marked as `DelicateFlowExtApi`. + */ +@MustBeDocumented +@Retention(value = AnnotationRetention.BINARY) +@RequiresOptIn( + level = RequiresOptIn.Level.WARNING, + message = "This is a delicate API and its use requires care." + + " Make sure you fully read and understand documentation of the declaration that is marked as a delicate API.", +) +public annotation class DelicateFlowExtApi diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt b/src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt index 5120397b..ff0c2d6e 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt @@ -24,15 +24,18 @@ package com.hoc081098.flowext -import com.hoc081098.flowext.utils.NULL_VALUE +import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow +// ------------------------------------------- PAIRWISE ------------------------------------------- + /** * Groups pairs of consecutive emissions together and emits them as a pair. * * Emits the `(n)th` and `(n-1)th` events as a pair. * The first value won't be emitted until the second one arrives. + * The resulting [Flow] is empty if this [Flow] emits less than two elements. * * This operator is more optimizer than [bufferCount] version: * ```kotlin @@ -46,13 +49,64 @@ import kotlinx.coroutines.flow.flow * } * ``` */ -public fun Flow.pairwise(): Flow> = flow { - var last: Any? = null +public fun Flow.pairwise(): Flow> = pairwiseInternal(::Pair) + +/** + * Groups pairs of consecutive emissions together and emits the result of applying [transform] + * function to each pair. + * + * The first value won't be emitted until the second one arrives. + * The resulting [Flow] is empty if this [Flow] emits less than two elements. + * + * This operator is more optimizer than [bufferCount] version: + * ```kotlin + * val flow: Flow + * + * val result: Flow = flow + * .bufferCount(bufferSize = 2, startBufferEvery = 1) + * .mapNotNull { + * if (it.size < 2) null + * else transform(it[0], it[1]) + * } + * ``` + * + * @param transform A function to apply to each pair of consecutive emissions. + */ +public fun Flow.pairwise(transform: suspend (a: T, b: T) -> R): Flow = + pairwiseInternal(transform) + +// ----------------------------------------- ZIP WITH NEXT ----------------------------------------- + +/** + * This function is an alias to [pairwise] operator. + * + * Groups pairs of consecutive emissions together and emits them as a pair. + * + * @see pairwise + */ +public fun Flow.zipWithNext(): Flow> = pairwise() + +/** + * This function is an alias to [pairwise] operator. + * + * Groups pairs of consecutive emissions together and emits the result of applying [transform] + * function to each pair. + * + * @see pairwise + */ +public fun Flow.zipWithNext(transform: suspend (a: T, b: T) -> R): Flow = + pairwise(transform) + +// ------------------------------------------- INTERNAL ------------------------------------------- + +private fun Flow.pairwiseInternal(transform: suspend (a: T, b: T) -> R): Flow = flow { + var last: Any? = INTERNAL_NULL_VALUE collect { - if (last !== null) { - emit(Pair(NULL_VALUE.unbox(last), it)) + if (last !== INTERNAL_NULL_VALUE) { + @Suppress("UNCHECKED_CAST") + emit(transform(last as T, it)) } - last = it ?: NULL_VALUE + last = it } } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt b/src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt index 96748e38..e155ba6f 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt @@ -26,7 +26,7 @@ package com.hoc081098.flowext -import com.hoc081098.flowext.utils.NULL_VALUE +import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.flow @@ -36,12 +36,12 @@ private typealias SubStateT = Any? internal fun Flow.select1Internal( selector: suspend (State) -> Result, ): Flow = flow { - var latestState: Any? = NULL_VALUE // Result | NULL_VALUE + var latestState: Any? = INTERNAL_NULL_VALUE // Result | NULL_VALUE distinctUntilChanged().collect { state -> val currentState = selector(state) - if (latestState === NULL_VALUE || (latestState as Result) != currentState) { + if (latestState === INTERNAL_NULL_VALUE || (latestState as Result) != currentState) { latestState = currentState emit(currentState) } @@ -56,7 +56,7 @@ private fun Flow.selectInternal( return flow { var latestSubStates: Array? = null - var latestState: Any? = NULL_VALUE // Result | NULL_VALUE + var latestState: Any? = INTERNAL_NULL_VALUE // Result | NULL_VALUE var reusableSubStates: Array? = null distinctUntilChanged().collect { state -> @@ -74,7 +74,7 @@ private fun Flow.selectInternal( .also { latestSubStates = it }, ) - if (latestState === NULL_VALUE || (latestState as Result) != currentState) { + if (latestState === INTERNAL_NULL_VALUE || (latestState as Result) != currentState) { latestState = currentState emit(currentState) } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt index a0cd28c0..ac516854 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt @@ -28,7 +28,7 @@ import com.hoc081098.flowext.ThrottleConfiguration.LEADING import com.hoc081098.flowext.ThrottleConfiguration.LEADING_AND_TRAILING import com.hoc081098.flowext.ThrottleConfiguration.TRAILING import com.hoc081098.flowext.internal.DONE_VALUE -import com.hoc081098.flowext.utils.NULL_VALUE +import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -244,7 +244,7 @@ public fun Flow.throttleTime( // Produce the values using the default (rendezvous) channel val values = produce { - collect { value -> send(value ?: NULL_VALUE) } + collect { value -> send(value ?: INTERNAL_NULL_VALUE) } } var lastValue: Any? = null @@ -258,7 +258,7 @@ public fun Flow.throttleTime( // before we emit, otherwise reentrant code can cause // issues here. lastValue = null // Consume the value - return@let downstream.emit(NULL_VALUE.unbox(consumed)) + return@let downstream.emit(INTERNAL_NULL_VALUE.unbox(consumed)) } } @@ -290,7 +290,7 @@ public fun Flow.throttleTime( trySend() } - when (val duration = durationSelector(NULL_VALUE.unbox(value))) { + when (val duration = durationSelector(INTERNAL_NULL_VALUE.unbox(value))) { Duration.ZERO -> onWindowClosed() else -> throttled = scope.launch { delay(duration) } } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt b/src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt index 96f2b231..ed91fe90 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt @@ -24,6 +24,7 @@ package com.hoc081098.flowext.utils +import com.hoc081098.flowext.DelicateFlowExtApi import kotlin.jvm.JvmField /** @@ -31,5 +32,16 @@ import kotlin.jvm.JvmField * This allows for writing faster generic code instead of using `Option`. * This is only used as an optimisation technique in low-level code. */ +@DelicateFlowExtApi @JvmField public val NULL_VALUE: Symbol = Symbol("NULL_VALUE") + +/** + * This is a work-around for having nested nulls in generic code. + * This allows for writing faster generic code instead of using `Option`. + * This is only used as an optimisation technique in low-level code. + * + * This is internal and should not be used outside of the library. + */ +@JvmField +internal val INTERNAL_NULL_VALUE: Symbol = Symbol("INTERNAL_NULL_VALUE") diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt b/src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt index 3ea02939..d8299f32 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt @@ -24,11 +24,13 @@ package com.hoc081098.flowext.utils +import com.hoc081098.flowext.DelicateFlowExtApi import kotlin.jvm.JvmField /** * A symbol class that is used to define unique constants that are self-explanatory in debugger. */ +@DelicateFlowExtApi public class Symbol( @JvmField public val symbol: String, ) { diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt index cbe907d4..42d643c0 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt @@ -25,7 +25,7 @@ package com.hoc081098.flowext import com.hoc081098.flowext.internal.AtomicRef -import com.hoc081098.flowext.utils.NULL_VALUE +import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow @@ -49,14 +49,14 @@ public fun Flow.withLatestFrom( try { coroutineScope { launch(start = CoroutineStart.UNDISPATCHED) { - other.collect { otherRef.value = it ?: NULL_VALUE } + other.collect { otherRef.value = it ?: INTERNAL_NULL_VALUE } } collect { value -> emit( transform( value, - NULL_VALUE.unbox(otherRef.value ?: return@collect), + INTERNAL_NULL_VALUE.unbox(otherRef.value ?: return@collect), ), ) } diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/PairwiseTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/PairwiseTest.kt index 81084415..3ea7ff06 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/PairwiseTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/PairwiseTest.kt @@ -29,7 +29,10 @@ import com.hoc081098.flowext.utils.TestException import com.hoc081098.flowext.utils.assertFailsWith import com.hoc081098.flowext.utils.test import kotlin.test.Test +import kotlin.test.assertIs import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf @@ -37,6 +40,10 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.take +private data class MyTuple2(val first: A, val second: B) + +private inline infix fun A.with(second: B) = MyTuple2(this, second) + @ExperimentalCoroutinesApi class PairwiseTest : BaseStepTest() { @Test @@ -69,6 +76,17 @@ class PairwiseTest : BaseStepTest() { Event.Complete, ), ) + + range(0, 4) + .zipWithNext() + .test( + listOf( + Event.Value(0 to 1), + Event.Value(1 to 2), + Event.Value(2 to 3), + Event.Complete, + ), + ) } @Test @@ -132,3 +150,158 @@ class PairwiseTest : BaseStepTest() { ) } } + +@ExperimentalCoroutinesApi +class PairwiseWithTransformTest : BaseStepTest() { + @Test + fun testPairwise() = runTest { + range(0, 4) + .pairwise(::MyTuple2) + .also { assertIs>>(it) } + .test( + listOf( + Event.Value(0 with 1), + Event.Value(1 with 2), + Event.Value(2 with 3), + Event.Complete, + ), + ) + + range(0, 4) + .bufferCount(bufferSize = 2, startBufferEvery = 1) + .mapNotNull { + if (it.size < 2) { + null + } else { + it[0] with it[1] + } + } + .also { assertIs>>(it) } + .test( + listOf( + Event.Value(0 with 1), + Event.Value(1 with 2), + Event.Value(2 with 3), + Event.Complete, + ), + ) + + range(0, 4) + .zipWithNext(::MyTuple2) + .also { assertIs>>(it) } + .test( + listOf( + Event.Value(0 with 1), + Event.Value(1 with 2), + Event.Value(2 with 3), + Event.Complete, + ), + ) + } + + @Test + fun testPairwiseWithSuspend() = runTest { + range(0, 4) + .pairwise { a, b -> + delay(100) + MyTuple2(a, b) + } + .also { assertIs>>(it) } + .test( + listOf( + Event.Value(0 with 1), + Event.Value(1 with 2), + Event.Value(2 with 3), + Event.Complete, + ), + ) + } + + @Test + fun testPairwiseNullable() = runTest { + // 0 - null - 2 - null + + range(0, 4) + .map { it.takeIf { it % 2 == 0 } } + .pairwise(::MyTuple2) + .test( + listOf( + Event.Value(0 with null), + Event.Value(null with 2), + Event.Value(2 with null), + Event.Complete, + ), + ) + } + + @Test + fun testPairwiseEmpty() = runTest { + emptyFlow() + .pairwise(::MyTuple2) + .test( + listOf( + Event.Complete, + ), + ) + } + + @Test + fun testPairwiseSingle() = runTest { + flowOf(1) + .pairwise(::MyTuple2) + .test( + listOf( + Event.Complete, + ), + ) + } + + @Test + fun testPairwiseFailureUpstream() = runTest { + assertFailsWith( + flow { throw TestException() } + .pairwise(::MyTuple2), + ) + } + + @Test + fun testPairwiseFailureTransform() = runTest { + // empty flow will not call transform function + emptyFlow() + .pairwise<_, String> { _, _ -> throw TestException("Broken!") } + .test( + listOf( + Event.Complete, + ), + ) + + // single element flow will not call transform function + flowOf(1) + .pairwise<_, String> { _, _ -> throw TestException("Broken!") } + .test( + listOf( + Event.Complete, + ), + ) + + // propagate exception from transform function + assertFailsWith( + flowOf(1, 2) + .pairwise<_, String> { _, _ -> throw TestException("Broken!") }, + ) + } + + @Test + fun testPairwiseCancellation() = runTest { + range(1, 100) + .pairwise(::MyTuple2) + .take(2) + .test( + listOf( + Event.Value(1 with 2), + Event.Value(2 with 3), + Event.Complete, + ), + ) + } +}