Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zipWithNext, pairwise): add zipWithNext, add pairwise(transform) #182

Merged
merged 10 commits into from
Oct 12, 2023
2 changes: 1 addition & 1 deletion .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
### Added

- Add `Flow.chunked` operator, it is an alias to `Flow.bufferCount` operator.
- Add `Flow.pairwise(transform)` operator - a variant of `Flow.pairwise()` operator,
which allows the transformation of the pair of values via the `transform` lambda parameter.

- Add `Flow.zipWithNext()` operator, it is an alias to `Flow.pairwise()` operator.
- Add `Flow.zipWithNext(transform)` operator, it is an alias to `Flow.pairwise(transform)` operator.

## [0.7.2] - Oct 7, 2023

Expand Down
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
> Kotlinx Coroutines Flow Extensions. Extensions to the Kotlin Flow library. Kotlin Flow extensions.
> Multiplatform Kotlinx Coroutines Flow Extensions. Multiplatform Extensions to the Kotlin Flow
> library. Multiplatform Kotlin Flow extensions. RxJS Kotlin Coroutines Flow. RxSwift Kotlin
> Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava Kotlin
> Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow.
> Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava
> Kotlin
> Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow. Kotlin Flow
> operators.
> Coroutines Flow operators.

## Author: [Petrus Nguyễn Thái Học](https://github.com/hoc081098)

Expand Down Expand Up @@ -145,7 +148,7 @@ dependencies {
- [`dematerialize`](#dematerialize)
- [`raceWith`](#racewith--ambwith)
- [`ambWith`](#racewith--ambwith)
- [`pairwise`](#pairwise)
- [`pairwise`](#pairwise--zipWithNext)
- [`repeat`](#repeat)
- [`retryWhenWithDelayStrategy`](#retrywhenwithdelaystrategy)
- [`retryWhenWithExponentialBackoff`](#retrywhenwithexponentialbackoff)
Expand All @@ -157,6 +160,7 @@ dependencies {
- [`takeUntil`](#takeuntil)
- [`throttleTime`](#throttletime)
- [`withLatestFrom`](#withlatestfrom)
- [`zipWithNext`](#pairwise--zipWithNext)

#### bufferCount / chunked

Expand Down Expand Up @@ -871,18 +875,25 @@ raceWith: 3

----

#### pairwise
#### pairwise / zipWithNext

- Similar to [RxJS pairwise](https://rxjs.dev/api/operators/pairwise)

Groups pairs of consecutive emissions together and emits them as a pair.
Emits the `(n)th` and `(n-1)th` events as a pair.
The first value won't be emitted until the second one arrives.
Note, `zipWithNext` is an alias to `pairwise`.

```kotlin
range(0, 4)
.pairwise()
.collect { println("pairwise: $it") }

println("---")

range(0, 4)
.zipWithNext { a, b -> "$a -> $b" }
.collect { println("zipWithNext: $it") }
```

Output:
Expand All @@ -891,6 +902,10 @@ Output:
pairwise: (0, 1)
pairwise: (1, 2)
pairwise: (2, 3)
---
zipWithNext: 0 -> 1
zipWithNext: 1 -> 2
zipWithNext: 2 -> 3
```

----
Expand Down
6 changes: 6 additions & 0 deletions api/FlowExt.api
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public final class com/hoc081098/flowext/DelayStrategy$NoDelayStrategy : com/hoc
public fun nextDelay-3nIYWDw (Ljava/lang/Throwable;J)J
}

public abstract interface annotation class com/hoc081098/flowext/DelicateFlowExtApi : java/lang/annotation/Annotation {
}

public final class com/hoc081098/flowext/EagerKt {
public static final fun flatMapConcatEager (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapConcatEager$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -183,6 +186,9 @@ public final class com/hoc081098/flowext/NeverFlowKt {

public final class com/hoc081098/flowext/PairwiseKt {
public static final fun pairwise (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun pairwise (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

public final class com/hoc081098/flowext/RaceKt {
Expand Down
8 changes: 8 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ repositories {
kotlin {
explicitApi()

sourceSets {
all {
languageSettings {
optIn("com.hoc081098.flowext.DelicateFlowExtApi")
}
}
}

jvmToolchain {
languageVersion.set(JavaLanguageVersion.of(17))
vendor.set(JvmVendorSpec.AZUL)
Expand Down
41 changes: 41 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/DelicateFlowExtApi.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* MIT License
*
* Copyright (c) 2021-2023 Petrus Nguyễn Thái Học
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.hoc081098.flowext

/**
* Marks declarations in the `FlowExt` that are **delicate** —
* they have limited use-case and shall be used with care in general code.
* Any use of a delicate declaration has to be carefully reviewed to make sure it is
* properly used and does not create problems like memory and resource leaks.
* Carefully read documentation of any declaration marked as `DelicateFlowExtApi`.
*/
@MustBeDocumented
@Retention(value = AnnotationRetention.BINARY)
@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This is a delicate API and its use requires care." +
" Make sure you fully read and understand documentation of the declaration that is marked as a delicate API.",
)
public annotation class DelicateFlowExtApi
66 changes: 60 additions & 6 deletions src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@

package com.hoc081098.flowext

import com.hoc081098.flowext.utils.NULL_VALUE
import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

// ------------------------------------------- PAIRWISE -------------------------------------------

/**
* Groups pairs of consecutive emissions together and emits them as a pair.
*
* Emits the `(n)th` and `(n-1)th` events as a pair.
* The first value won't be emitted until the second one arrives.
* The resulting [Flow] is empty if this [Flow] emits less than two elements.
*
* This operator is more optimizer than [bufferCount] version:
* ```kotlin
Expand All @@ -46,13 +49,64 @@ import kotlinx.coroutines.flow.flow
* }
* ```
*/
public fun <T> Flow<T>.pairwise(): Flow<Pair<T, T>> = flow {
var last: Any? = null
public fun <T> Flow<T>.pairwise(): Flow<Pair<T, T>> = pairwiseInternal(::Pair)

/**
* Groups pairs of consecutive emissions together and emits the result of applying [transform]
* function to each pair.
*
* The first value won't be emitted until the second one arrives.
* The resulting [Flow] is empty if this [Flow] emits less than two elements.
*
* This operator is more optimizer than [bufferCount] version:
* ```kotlin
* val flow: Flow<T>
*
* val result: Flow<R> = flow
* .bufferCount(bufferSize = 2, startBufferEvery = 1)
* .mapNotNull {
* if (it.size < 2) null
* else transform(it[0], it[1])
* }
* ```
*
* @param transform A function to apply to each pair of consecutive emissions.
*/
public fun <T, R> Flow<T>.pairwise(transform: suspend (a: T, b: T) -> R): Flow<R> =
pairwiseInternal(transform)

// ----------------------------------------- ZIP WITH NEXT -----------------------------------------

/**
* This function is an alias to [pairwise] operator.
*
* Groups pairs of consecutive emissions together and emits them as a pair.
*
* @see pairwise
*/
public fun <T> Flow<T>.zipWithNext(): Flow<Pair<T, T>> = pairwise()

/**
* This function is an alias to [pairwise] operator.
*
* Groups pairs of consecutive emissions together and emits the result of applying [transform]
* function to each pair.
*
* @see pairwise
*/
public fun <T, R> Flow<T>.zipWithNext(transform: suspend (a: T, b: T) -> R): Flow<R> =
pairwise(transform)

// ------------------------------------------- INTERNAL -------------------------------------------

private fun <T, R> Flow<T>.pairwiseInternal(transform: suspend (a: T, b: T) -> R): Flow<R> = flow {
var last: Any? = INTERNAL_NULL_VALUE

collect {
if (last !== null) {
emit(Pair(NULL_VALUE.unbox(last), it))
if (last !== INTERNAL_NULL_VALUE) {
@Suppress("UNCHECKED_CAST")
emit(transform(last as T, it))
}
last = it ?: NULL_VALUE
last = it
}
}
10 changes: 5 additions & 5 deletions src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

package com.hoc081098.flowext

import com.hoc081098.flowext.utils.NULL_VALUE
import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
Expand All @@ -36,12 +36,12 @@ private typealias SubStateT = Any?
internal fun <State, Result> Flow<State>.select1Internal(
selector: suspend (State) -> Result,
): Flow<Result> = flow {
var latestState: Any? = NULL_VALUE // Result | NULL_VALUE
var latestState: Any? = INTERNAL_NULL_VALUE // Result | NULL_VALUE

distinctUntilChanged().collect { state ->
val currentState = selector(state)

if (latestState === NULL_VALUE || (latestState as Result) != currentState) {
if (latestState === INTERNAL_NULL_VALUE || (latestState as Result) != currentState) {
latestState = currentState
emit(currentState)
}
Expand All @@ -56,7 +56,7 @@ private fun <State, Result> Flow<State>.selectInternal(

return flow {
var latestSubStates: Array<SubStateT>? = null
var latestState: Any? = NULL_VALUE // Result | NULL_VALUE
var latestState: Any? = INTERNAL_NULL_VALUE // Result | NULL_VALUE
var reusableSubStates: Array<SubStateT>? = null

distinctUntilChanged().collect { state ->
Expand All @@ -74,7 +74,7 @@ private fun <State, Result> Flow<State>.selectInternal(
.also { latestSubStates = it },
)

if (latestState === NULL_VALUE || (latestState as Result) != currentState) {
if (latestState === INTERNAL_NULL_VALUE || (latestState as Result) != currentState) {
latestState = currentState
emit(currentState)
}
Expand Down
8 changes: 4 additions & 4 deletions src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.hoc081098.flowext.ThrottleConfiguration.LEADING
import com.hoc081098.flowext.ThrottleConfiguration.LEADING_AND_TRAILING
import com.hoc081098.flowext.ThrottleConfiguration.TRAILING
import com.hoc081098.flowext.internal.DONE_VALUE
import com.hoc081098.flowext.utils.NULL_VALUE
import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand Down Expand Up @@ -244,7 +244,7 @@ public fun <T> Flow<T>.throttleTime(

// Produce the values using the default (rendezvous) channel
val values = produce {
collect { value -> send(value ?: NULL_VALUE) }
collect { value -> send(value ?: INTERNAL_NULL_VALUE) }
}

var lastValue: Any? = null
Expand All @@ -258,7 +258,7 @@ public fun <T> Flow<T>.throttleTime(
// before we emit, otherwise reentrant code can cause
// issues here.
lastValue = null // Consume the value
return@let downstream.emit(NULL_VALUE.unbox(consumed))
return@let downstream.emit(INTERNAL_NULL_VALUE.unbox(consumed))
}
}

Expand Down Expand Up @@ -290,7 +290,7 @@ public fun <T> Flow<T>.throttleTime(
trySend()
}

when (val duration = durationSelector(NULL_VALUE.unbox(value))) {
when (val duration = durationSelector(INTERNAL_NULL_VALUE.unbox(value))) {
Duration.ZERO -> onWindowClosed()
else -> throttled = scope.launch { delay(duration) }
}
Expand Down
12 changes: 12 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,24 @@

package com.hoc081098.flowext.utils

import com.hoc081098.flowext.DelicateFlowExtApi
import kotlin.jvm.JvmField

/**
* This is a work-around for having nested nulls in generic code.
* This allows for writing faster generic code instead of using `Option`.
* This is only used as an optimisation technique in low-level code.
*/
@DelicateFlowExtApi
@JvmField
public val NULL_VALUE: Symbol = Symbol("NULL_VALUE")

/**
* This is a work-around for having nested nulls in generic code.
* This allows for writing faster generic code instead of using `Option`.
* This is only used as an optimisation technique in low-level code.
*
* This is internal and should not be used outside of the library.
*/
@JvmField
internal val INTERNAL_NULL_VALUE: Symbol = Symbol("INTERNAL_NULL_VALUE")
2 changes: 2 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@

package com.hoc081098.flowext.utils

import com.hoc081098.flowext.DelicateFlowExtApi
import kotlin.jvm.JvmField

/**
* A symbol class that is used to define unique constants that are self-explanatory in debugger.
*/
@DelicateFlowExtApi
public class Symbol(
@JvmField public val symbol: String,
) {
Expand Down
6 changes: 3 additions & 3 deletions src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package com.hoc081098.flowext

import com.hoc081098.flowext.internal.AtomicRef
import com.hoc081098.flowext.utils.NULL_VALUE
import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
Expand All @@ -49,14 +49,14 @@ public fun <A, B, R> Flow<A>.withLatestFrom(
try {
coroutineScope {
launch(start = CoroutineStart.UNDISPATCHED) {
other.collect { otherRef.value = it ?: NULL_VALUE }
other.collect { otherRef.value = it ?: INTERNAL_NULL_VALUE }
}

collect { value ->
emit(
transform(
value,
NULL_VALUE.unbox(otherRef.value ?: return@collect),
INTERNAL_NULL_VALUE.unbox(otherRef.value ?: return@collect),
),
)
}
Expand Down
Loading