Skip to content

Commit

Permalink
done scanWith
Browse files Browse the repository at this point in the history
  • Loading branch information
hoc081098 committed Oct 6, 2023
1 parent 34d07d9 commit d288e67
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/scanWith.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,47 @@ package com.hoc081098.flowext

import kotlin.experimental.ExperimentalTypeInference
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.scan

/**
* Folds the given flow with [operation], emitting every intermediate result,
* including the initial value supplied by [initialSupplier] at the collection time.
*
* Note that the returned initial value should be immutable (or should not be mutated)
* as it is shared between different collectors.
*
* This is a variant of [scan] that the initial value is lazily supplied,
* which is useful when the initial value is expensive to create
* or depends on a logic that should be executed at the collection time (lazy semantics).
*
* For example:
* ```kotlin
* flowOf(1, 2, 3)
* .scanWith({ emptyList<Int>() }) { acc, value -> acc + value }
* .toList()
* ```
* will produce `[[], [1], [1, 2], [1, 2, 3]]`.
*
* Another example:
* ```kotlin
* // Logic to calculate initial value (e.g. call API, read from DB, etc.)
* suspend fun calculateInitialValue(): Int {
* println("calculateInitialValue")
* delay(1000)
* return 0
* }
*
* flowOf(1, 2, 3).scanWith(::calculateInitialValue) { acc, value -> acc + value }
* ```
*
* @param initialSupplier a function that returns the initial (seed) accumulator value for each individual collector.
* @param operation an accumulator function to be invoked on each item emitted by the current [Flow],
* whose result will be emitted to collector via [FlowCollector.emit]
* and used in the next accumulator call.
*/
@OptIn(ExperimentalTypeInference::class)
public fun <T, R> Flow<T>.scanWith(
initialSupplier: suspend () -> R,
Expand Down
10 changes: 10 additions & 0 deletions src/commonTest/kotlin/com/hoc081098/flowext/ScanWithTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,20 @@ class ScanWithTest : BaseStepTest() {
@Test
fun callInitialSupplierPerCollection() = runTest {
var initial = 100
var nextIndex = 0

val flow = flowOf(1, 2, 3, 4)
.scanWith(
initialSupplier = {
expect(nextIndex)

delay(100)
++initial
},
) { acc, value -> acc + value }

expect(1)
nextIndex = 2
flow.test(
listOf(
Event.Value(101),
Expand All @@ -63,6 +68,9 @@ class ScanWithTest : BaseStepTest() {
),
)

expect(3)
nextIndex = 4

flow.test(
listOf(
Event.Value(102),
Expand All @@ -73,6 +81,8 @@ class ScanWithTest : BaseStepTest() {
Event.Complete,
),
)

finish(5)
}

@Test
Expand Down

0 comments on commit d288e67

Please sign in to comment.