diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 4c6280eb..aae8b7c8 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -9,4 +9,4 @@
-
\ No newline at end of file
+
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 49b1fa18..24f2f294 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,11 @@
### Added
- Add `Flow.chunked` operator, it is an alias to `Flow.bufferCount` operator.
+- Add `Flow.pairwise(transform)` operator - a variant of `Flow.pairwise()` operator,
+ which allows the transformation of the pair of values via the `transform` lambda parameter.
+
+- Add `Flow.zipWithNext()` operator, it is an alias to `Flow.pairwise()` operator.
+- Add `Flow.zipWithNext(transform)` operator, it is an alias to `Flow.pairwise(transform)` operator.
## [0.7.2] - Oct 7, 2023
diff --git a/README.md b/README.md
index fa61c79a..a28d41bb 100644
--- a/README.md
+++ b/README.md
@@ -25,8 +25,11 @@
> Kotlinx Coroutines Flow Extensions. Extensions to the Kotlin Flow library. Kotlin Flow extensions.
> Multiplatform Kotlinx Coroutines Flow Extensions. Multiplatform Extensions to the Kotlin Flow
> library. Multiplatform Kotlin Flow extensions. RxJS Kotlin Coroutines Flow. RxSwift Kotlin
-> Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava Kotlin
-> Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow.
+> Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava
+> Kotlin
+> Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow. Kotlin Flow
+> operators.
+> Coroutines Flow operators.
## Author: [Petrus Nguyễn Thái Học](https://github.com/hoc081098)
@@ -145,7 +148,7 @@ dependencies {
- [`dematerialize`](#dematerialize)
- [`raceWith`](#racewith--ambwith)
- [`ambWith`](#racewith--ambwith)
- - [`pairwise`](#pairwise)
+ - [`pairwise`](#pairwise--zipWithNext)
- [`repeat`](#repeat)
- [`retryWhenWithDelayStrategy`](#retrywhenwithdelaystrategy)
- [`retryWhenWithExponentialBackoff`](#retrywhenwithexponentialbackoff)
@@ -157,6 +160,7 @@ dependencies {
- [`takeUntil`](#takeuntil)
- [`throttleTime`](#throttletime)
- [`withLatestFrom`](#withlatestfrom)
+ - [`zipWithNext`](#pairwise--zipWithNext)
#### bufferCount / chunked
@@ -871,18 +875,25 @@ raceWith: 3
----
-#### pairwise
+#### pairwise / zipWithNext
- Similar to [RxJS pairwise](https://rxjs.dev/api/operators/pairwise)
Groups pairs of consecutive emissions together and emits them as a pair.
Emits the `(n)th` and `(n-1)th` events as a pair.
The first value won't be emitted until the second one arrives.
+Note, `zipWithNext` is an alias to `pairwise`.
```kotlin
range(0, 4)
.pairwise()
.collect { println("pairwise: $it") }
+
+println("---")
+
+range(0, 4)
+ .zipWithNext { a, b -> "$a -> $b" }
+ .collect { println("zipWithNext: $it") }
```
Output:
@@ -891,6 +902,10 @@ Output:
pairwise: (0, 1)
pairwise: (1, 2)
pairwise: (2, 3)
+---
+zipWithNext: 0 -> 1
+zipWithNext: 1 -> 2
+zipWithNext: 2 -> 3
```
----
diff --git a/api/FlowExt.api b/api/FlowExt.api
index 06efabd3..8289218c 100644
--- a/api/FlowExt.api
+++ b/api/FlowExt.api
@@ -68,6 +68,9 @@ public final class com/hoc081098/flowext/DelayStrategy$NoDelayStrategy : com/hoc
public fun nextDelay-3nIYWDw (Ljava/lang/Throwable;J)J
}
+public abstract interface annotation class com/hoc081098/flowext/DelicateFlowExtApi : java/lang/annotation/Annotation {
+}
+
public final class com/hoc081098/flowext/EagerKt {
public static final fun flatMapConcatEager (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapConcatEager$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -183,6 +186,9 @@ public final class com/hoc081098/flowext/NeverFlowKt {
public final class com/hoc081098/flowext/PairwiseKt {
public static final fun pairwise (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun pairwise (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}
public final class com/hoc081098/flowext/RaceKt {
diff --git a/build.gradle.kts b/build.gradle.kts
index 543a228a..aac4c769 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -31,6 +31,14 @@ repositories {
kotlin {
explicitApi()
+ sourceSets {
+ all {
+ languageSettings {
+ optIn("com.hoc081098.flowext.DelicateFlowExtApi")
+ }
+ }
+ }
+
jvmToolchain {
languageVersion.set(JavaLanguageVersion.of(17))
vendor.set(JvmVendorSpec.AZUL)
diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/DelicateFlowExtApi.kt b/src/commonMain/kotlin/com/hoc081098/flowext/DelicateFlowExtApi.kt
new file mode 100644
index 00000000..c933ff88
--- /dev/null
+++ b/src/commonMain/kotlin/com/hoc081098/flowext/DelicateFlowExtApi.kt
@@ -0,0 +1,41 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2021-2023 Petrus Nguyễn Thái Học
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.hoc081098.flowext
+
+/**
+ * Marks declarations in the `FlowExt` that are **delicate** —
+ * they have limited use-case and shall be used with care in general code.
+ * Any use of a delicate declaration has to be carefully reviewed to make sure it is
+ * properly used and does not create problems like memory and resource leaks.
+ * Carefully read documentation of any declaration marked as `DelicateFlowExtApi`.
+ */
+@MustBeDocumented
+@Retention(value = AnnotationRetention.BINARY)
+@RequiresOptIn(
+ level = RequiresOptIn.Level.WARNING,
+ message = "This is a delicate API and its use requires care." +
+ " Make sure you fully read and understand documentation of the declaration that is marked as a delicate API.",
+)
+public annotation class DelicateFlowExtApi
diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt b/src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt
index 5120397b..ff0c2d6e 100644
--- a/src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt
+++ b/src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt
@@ -24,15 +24,18 @@
package com.hoc081098.flowext
-import com.hoc081098.flowext.utils.NULL_VALUE
+import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
+// ------------------------------------------- PAIRWISE -------------------------------------------
+
/**
* Groups pairs of consecutive emissions together and emits them as a pair.
*
* Emits the `(n)th` and `(n-1)th` events as a pair.
* The first value won't be emitted until the second one arrives.
+ * The resulting [Flow] is empty if this [Flow] emits less than two elements.
*
* This operator is more optimizer than [bufferCount] version:
* ```kotlin
@@ -46,13 +49,64 @@ import kotlinx.coroutines.flow.flow
* }
* ```
*/
-public fun Flow.pairwise(): Flow> = flow {
- var last: Any? = null
+public fun Flow.pairwise(): Flow> = pairwiseInternal(::Pair)
+
+/**
+ * Groups pairs of consecutive emissions together and emits the result of applying [transform]
+ * function to each pair.
+ *
+ * The first value won't be emitted until the second one arrives.
+ * The resulting [Flow] is empty if this [Flow] emits less than two elements.
+ *
+ * This operator is more optimizer than [bufferCount] version:
+ * ```kotlin
+ * val flow: Flow
+ *
+ * val result: Flow = flow
+ * .bufferCount(bufferSize = 2, startBufferEvery = 1)
+ * .mapNotNull {
+ * if (it.size < 2) null
+ * else transform(it[0], it[1])
+ * }
+ * ```
+ *
+ * @param transform A function to apply to each pair of consecutive emissions.
+ */
+public fun Flow.pairwise(transform: suspend (a: T, b: T) -> R): Flow =
+ pairwiseInternal(transform)
+
+// ----------------------------------------- ZIP WITH NEXT -----------------------------------------
+
+/**
+ * This function is an alias to [pairwise] operator.
+ *
+ * Groups pairs of consecutive emissions together and emits them as a pair.
+ *
+ * @see pairwise
+ */
+public fun Flow.zipWithNext(): Flow> = pairwise()
+
+/**
+ * This function is an alias to [pairwise] operator.
+ *
+ * Groups pairs of consecutive emissions together and emits the result of applying [transform]
+ * function to each pair.
+ *
+ * @see pairwise
+ */
+public fun Flow.zipWithNext(transform: suspend (a: T, b: T) -> R): Flow =
+ pairwise(transform)
+
+// ------------------------------------------- INTERNAL -------------------------------------------
+
+private fun Flow.pairwiseInternal(transform: suspend (a: T, b: T) -> R): Flow = flow {
+ var last: Any? = INTERNAL_NULL_VALUE
collect {
- if (last !== null) {
- emit(Pair(NULL_VALUE.unbox(last), it))
+ if (last !== INTERNAL_NULL_VALUE) {
+ @Suppress("UNCHECKED_CAST")
+ emit(transform(last as T, it))
}
- last = it ?: NULL_VALUE
+ last = it
}
}
diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt b/src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt
index 96748e38..e155ba6f 100644
--- a/src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt
+++ b/src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt
@@ -26,7 +26,7 @@
package com.hoc081098.flowext
-import com.hoc081098.flowext.utils.NULL_VALUE
+import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
@@ -36,12 +36,12 @@ private typealias SubStateT = Any?
internal fun Flow.select1Internal(
selector: suspend (State) -> Result,
): Flow = flow {
- var latestState: Any? = NULL_VALUE // Result | NULL_VALUE
+ var latestState: Any? = INTERNAL_NULL_VALUE // Result | NULL_VALUE
distinctUntilChanged().collect { state ->
val currentState = selector(state)
- if (latestState === NULL_VALUE || (latestState as Result) != currentState) {
+ if (latestState === INTERNAL_NULL_VALUE || (latestState as Result) != currentState) {
latestState = currentState
emit(currentState)
}
@@ -56,7 +56,7 @@ private fun Flow.selectInternal(
return flow {
var latestSubStates: Array? = null
- var latestState: Any? = NULL_VALUE // Result | NULL_VALUE
+ var latestState: Any? = INTERNAL_NULL_VALUE // Result | NULL_VALUE
var reusableSubStates: Array? = null
distinctUntilChanged().collect { state ->
@@ -74,7 +74,7 @@ private fun Flow.selectInternal(
.also { latestSubStates = it },
)
- if (latestState === NULL_VALUE || (latestState as Result) != currentState) {
+ if (latestState === INTERNAL_NULL_VALUE || (latestState as Result) != currentState) {
latestState = currentState
emit(currentState)
}
diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt
index a0cd28c0..ac516854 100644
--- a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt
+++ b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt
@@ -28,7 +28,7 @@ import com.hoc081098.flowext.ThrottleConfiguration.LEADING
import com.hoc081098.flowext.ThrottleConfiguration.LEADING_AND_TRAILING
import com.hoc081098.flowext.ThrottleConfiguration.TRAILING
import com.hoc081098.flowext.internal.DONE_VALUE
-import com.hoc081098.flowext.utils.NULL_VALUE
+import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -244,7 +244,7 @@ public fun Flow.throttleTime(
// Produce the values using the default (rendezvous) channel
val values = produce {
- collect { value -> send(value ?: NULL_VALUE) }
+ collect { value -> send(value ?: INTERNAL_NULL_VALUE) }
}
var lastValue: Any? = null
@@ -258,7 +258,7 @@ public fun Flow.throttleTime(
// before we emit, otherwise reentrant code can cause
// issues here.
lastValue = null // Consume the value
- return@let downstream.emit(NULL_VALUE.unbox(consumed))
+ return@let downstream.emit(INTERNAL_NULL_VALUE.unbox(consumed))
}
}
@@ -290,7 +290,7 @@ public fun Flow.throttleTime(
trySend()
}
- when (val duration = durationSelector(NULL_VALUE.unbox(value))) {
+ when (val duration = durationSelector(INTERNAL_NULL_VALUE.unbox(value))) {
Duration.ZERO -> onWindowClosed()
else -> throttled = scope.launch { delay(duration) }
}
diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt b/src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt
index 96f2b231..ed91fe90 100644
--- a/src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt
+++ b/src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt
@@ -24,6 +24,7 @@
package com.hoc081098.flowext.utils
+import com.hoc081098.flowext.DelicateFlowExtApi
import kotlin.jvm.JvmField
/**
@@ -31,5 +32,16 @@ import kotlin.jvm.JvmField
* This allows for writing faster generic code instead of using `Option`.
* This is only used as an optimisation technique in low-level code.
*/
+@DelicateFlowExtApi
@JvmField
public val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
+
+/**
+ * This is a work-around for having nested nulls in generic code.
+ * This allows for writing faster generic code instead of using `Option`.
+ * This is only used as an optimisation technique in low-level code.
+ *
+ * This is internal and should not be used outside of the library.
+ */
+@JvmField
+internal val INTERNAL_NULL_VALUE: Symbol = Symbol("INTERNAL_NULL_VALUE")
diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt b/src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt
index 3ea02939..d8299f32 100644
--- a/src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt
+++ b/src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt
@@ -24,11 +24,13 @@
package com.hoc081098.flowext.utils
+import com.hoc081098.flowext.DelicateFlowExtApi
import kotlin.jvm.JvmField
/**
* A symbol class that is used to define unique constants that are self-explanatory in debugger.
*/
+@DelicateFlowExtApi
public class Symbol(
@JvmField public val symbol: String,
) {
diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt
index cbe907d4..42d643c0 100644
--- a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt
+++ b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt
@@ -25,7 +25,7 @@
package com.hoc081098.flowext
import com.hoc081098.flowext.internal.AtomicRef
-import com.hoc081098.flowext.utils.NULL_VALUE
+import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
@@ -49,14 +49,14 @@ public fun Flow.withLatestFrom(
try {
coroutineScope {
launch(start = CoroutineStart.UNDISPATCHED) {
- other.collect { otherRef.value = it ?: NULL_VALUE }
+ other.collect { otherRef.value = it ?: INTERNAL_NULL_VALUE }
}
collect { value ->
emit(
transform(
value,
- NULL_VALUE.unbox(otherRef.value ?: return@collect),
+ INTERNAL_NULL_VALUE.unbox(otherRef.value ?: return@collect),
),
)
}
diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/PairwiseTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/PairwiseTest.kt
index 81084415..3ea7ff06 100644
--- a/src/commonTest/kotlin/com/hoc081098/flowext/PairwiseTest.kt
+++ b/src/commonTest/kotlin/com/hoc081098/flowext/PairwiseTest.kt
@@ -29,7 +29,10 @@ import com.hoc081098.flowext.utils.TestException
import com.hoc081098.flowext.utils.assertFailsWith
import com.hoc081098.flowext.utils.test
import kotlin.test.Test
+import kotlin.test.assertIs
import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
@@ -37,6 +40,10 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.take
+private data class MyTuple2(val first: A, val second: B)
+
+private inline infix fun A.with(second: B) = MyTuple2(this, second)
+
@ExperimentalCoroutinesApi
class PairwiseTest : BaseStepTest() {
@Test
@@ -69,6 +76,17 @@ class PairwiseTest : BaseStepTest() {
Event.Complete,
),
)
+
+ range(0, 4)
+ .zipWithNext()
+ .test(
+ listOf(
+ Event.Value(0 to 1),
+ Event.Value(1 to 2),
+ Event.Value(2 to 3),
+ Event.Complete,
+ ),
+ )
}
@Test
@@ -132,3 +150,158 @@ class PairwiseTest : BaseStepTest() {
)
}
}
+
+@ExperimentalCoroutinesApi
+class PairwiseWithTransformTest : BaseStepTest() {
+ @Test
+ fun testPairwise() = runTest {
+ range(0, 4)
+ .pairwise(::MyTuple2)
+ .also { assertIs>>(it) }
+ .test(
+ listOf(
+ Event.Value(0 with 1),
+ Event.Value(1 with 2),
+ Event.Value(2 with 3),
+ Event.Complete,
+ ),
+ )
+
+ range(0, 4)
+ .bufferCount(bufferSize = 2, startBufferEvery = 1)
+ .mapNotNull {
+ if (it.size < 2) {
+ null
+ } else {
+ it[0] with it[1]
+ }
+ }
+ .also { assertIs>>(it) }
+ .test(
+ listOf(
+ Event.Value(0 with 1),
+ Event.Value(1 with 2),
+ Event.Value(2 with 3),
+ Event.Complete,
+ ),
+ )
+
+ range(0, 4)
+ .zipWithNext(::MyTuple2)
+ .also { assertIs>>(it) }
+ .test(
+ listOf(
+ Event.Value(0 with 1),
+ Event.Value(1 with 2),
+ Event.Value(2 with 3),
+ Event.Complete,
+ ),
+ )
+ }
+
+ @Test
+ fun testPairwiseWithSuspend() = runTest {
+ range(0, 4)
+ .pairwise { a, b ->
+ delay(100)
+ MyTuple2(a, b)
+ }
+ .also { assertIs>>(it) }
+ .test(
+ listOf(
+ Event.Value(0 with 1),
+ Event.Value(1 with 2),
+ Event.Value(2 with 3),
+ Event.Complete,
+ ),
+ )
+ }
+
+ @Test
+ fun testPairwiseNullable() = runTest {
+ // 0 - null - 2 - null
+
+ range(0, 4)
+ .map { it.takeIf { it % 2 == 0 } }
+ .pairwise(::MyTuple2)
+ .test(
+ listOf(
+ Event.Value(0 with null),
+ Event.Value(null with 2),
+ Event.Value(2 with null),
+ Event.Complete,
+ ),
+ )
+ }
+
+ @Test
+ fun testPairwiseEmpty() = runTest {
+ emptyFlow()
+ .pairwise(::MyTuple2)
+ .test(
+ listOf(
+ Event.Complete,
+ ),
+ )
+ }
+
+ @Test
+ fun testPairwiseSingle() = runTest {
+ flowOf(1)
+ .pairwise(::MyTuple2)
+ .test(
+ listOf(
+ Event.Complete,
+ ),
+ )
+ }
+
+ @Test
+ fun testPairwiseFailureUpstream() = runTest {
+ assertFailsWith(
+ flow { throw TestException() }
+ .pairwise(::MyTuple2),
+ )
+ }
+
+ @Test
+ fun testPairwiseFailureTransform() = runTest {
+ // empty flow will not call transform function
+ emptyFlow()
+ .pairwise<_, String> { _, _ -> throw TestException("Broken!") }
+ .test(
+ listOf(
+ Event.Complete,
+ ),
+ )
+
+ // single element flow will not call transform function
+ flowOf(1)
+ .pairwise<_, String> { _, _ -> throw TestException("Broken!") }
+ .test(
+ listOf(
+ Event.Complete,
+ ),
+ )
+
+ // propagate exception from transform function
+ assertFailsWith(
+ flowOf(1, 2)
+ .pairwise<_, String> { _, _ -> throw TestException("Broken!") },
+ )
+ }
+
+ @Test
+ fun testPairwiseCancellation() = runTest {
+ range(1, 100)
+ .pairwise(::MyTuple2)
+ .take(2)
+ .test(
+ listOf(
+ Event.Value(1 with 2),
+ Event.Value(2 with 3),
+ Event.Complete,
+ ),
+ )
+ }
+}