From b91dc0fc3be01ae9568d5f3b3b508b0bc8b763ef Mon Sep 17 00:00:00 2001 From: Alejandro Serrano Mena Date: Fri, 3 Nov 2023 22:12:17 +0100 Subject: [PATCH] Start work on arrow-collectors --- .../fx/arrow-collectors/build.gradle.kts | 43 +++++ .../fx/arrow-collectors/gradle.properties | 4 + .../kotlin/arrow/collectors/Collector.kt | 147 ++++++++++++++++++ .../kotlin/arrow/collectors/Collectors.kt | 147 ++++++++++++++++++ .../kotlin/arrow/collectors/Flow.kt | 104 +++++++++++++ .../commonMain/kotlin/arrow/collectors/Zip.kt | 31 ++++ .../kotlin/arrow/collectors/CollectorsTest.kt | 34 ++++ .../kotlin/arrow/collectors/Collectors.kt | 51 ++++++ .../kotlin/arrow/collectors/JvmCollector.kt | 37 +++++ settings.gradle.kts | 3 + 10 files changed, 601 insertions(+) create mode 100644 arrow-libs/fx/arrow-collectors/build.gradle.kts create mode 100644 arrow-libs/fx/arrow-collectors/gradle.properties create mode 100644 arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collector.kt create mode 100644 arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collectors.kt create mode 100644 arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Flow.kt create mode 100644 arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Zip.kt create mode 100644 arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/CollectorsTest.kt create mode 100644 arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/Collectors.kt create mode 100644 arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/JvmCollector.kt diff --git a/arrow-libs/fx/arrow-collectors/build.gradle.kts b/arrow-libs/fx/arrow-collectors/build.gradle.kts new file mode 100644 index 00000000000..9456b77a776 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/build.gradle.kts @@ -0,0 +1,43 @@ +@file:Suppress("DSL_SCOPE_VIOLATION") + +plugins { + id(libs.plugins.kotlin.multiplatform.get().pluginId) + alias(libs.plugins.arrowGradleConfig.kotlin) + alias(libs.plugins.arrowGradleConfig.publish) +} + +apply(from = property("ANIMALSNIFFER_MPP")) + +kotlin { + sourceSets { + commonMain { + dependencies { + api(projects.arrowFxCoroutines) + api(projects.arrowAtomic) + api(libs.coroutines.core) + implementation(libs.kotlin.stdlibCommon) + } + } + + commonTest { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotest.frameworkEngine) + implementation(libs.kotest.assertionsCore) + implementation(libs.kotest.property) + } + } + } + + jvm { + tasks.jvmJar { + manifest { + attributes["Automatic-Module-Name"] = "arrow.collectors" + } + } + } +} + +tasks.withType { + useJUnitPlatform() +} diff --git a/arrow-libs/fx/arrow-collectors/gradle.properties b/arrow-libs/fx/arrow-collectors/gradle.properties new file mode 100644 index 00000000000..204dd6f3804 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/gradle.properties @@ -0,0 +1,4 @@ +# Maven publishing configuration +pom.name=Arrow Collectors +# Build configuration +kapt.incremental.apt=false diff --git a/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collector.kt b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collector.kt new file mode 100644 index 00000000000..3ce5f4a9020 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collector.kt @@ -0,0 +1,147 @@ +package arrow.collectors + +// https://hackage.haskell.org/package/foldl-1.4.15/docs/Control-Foldl.html + +/** + * Defines how the [collect] function may run the collector. + */ +public enum class Characteristics { + /** + * The flow may be collected using several workers. + */ + CONCURRENT, + + /** + * The final step of the collector simply returns the accumulator. + * That means that final call may be replaced by a cast. + */ + IDENTITY_FINISH, + + /** + * The order in which elements are accumulated doesn't matter for the end result. + */ + UNORDERED; + + public companion object { + public val CONCURRENT_UNORDERED: Set = setOf(CONCURRENT, UNORDERED) + public val IDENTITY_CONCURRENT: Set = setOf(IDENTITY_FINISH, CONCURRENT) + public val IDENTITY_CONCURRENT_UNORDERED: Set = setOf(IDENTITY_FINISH, CONCURRENT, UNORDERED) + } +} + + +public typealias Collector = CollectorI<*, T, R> + +/** + * A [Collector] accumulates information from elements + * coming from some data source, usually a [kotlinx.coroutines.flow.Flow] or [Iterable]. + * + * The accumulation is done in three phases: + * - Initialization of some (mutable) accumulator ([supply]) + * - Updating the accumulator with each value ([accumulate]) + * - Finalize the work, and extract the final result ([finish]) + * + * This interface is heavily influenced by + * Java's [`Collector`](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html) + * and Haskell's [`foldl` library](https://hackage.haskell.org/package/foldl/docs/Control-Foldl.html). + */ +public interface CollectorI { + public val characteristics: Set + public suspend fun supply(): A + public suspend fun accumulate(current: A, value: T) + public suspend fun finish(current: A): R + + /** + * Performs additional work during the finalization phase, + * by applying a function to the end result. + * + * @param finish Additional function to apply to the end result. + */ + // functor over R + public fun map( + finish: suspend (R) -> S, + ): Collector { + val me = this + return object : CollectorI { + override val characteristics: Set = + me.characteristics + + override suspend fun supply(): A = + me.supply() + + override suspend fun accumulate(current: A, value: T) = + me.accumulate(current, value) + + override suspend fun finish(current: A): S = + finish(me.finish(current)) + } + } + + /** + * Applies a function over each element in the data source, + * before giving it to the collector. + * + * @param transform Function to apply to each value + */ + // contravariant functor over T + public fun

contramap( + transform: suspend (P) -> T, + ): Collector { + val me = this + return object : CollectorI { + override val characteristics: Set = + me.characteristics + + override suspend fun supply(): A = + me.supply() + + override suspend fun accumulate(current: A, value: P) = + me.accumulate(current, transform(value)) + + override suspend fun finish(current: A): R = + me.finish(current) + } + } + + /** + * Combines two [Collector]s by performing the phases + * of each of them in parallel. + * + * @param other [Collector] to combine with [this] + */ + // applicative + public fun zip( + other: CollectorI, + ): Collector> = + this.zip(other, ::Pair) + + /** + * Combines two [Collector]s by performing the phases + * of each of them in parallel, and then combining + * the end result in a final step. + * + * @param other [Collector] to combine with [this] + * @param finish Function that combines the end results + */ + public fun zip( + other: CollectorI, + finish: (R, S) -> V, + ): Collector { + val me = this + return object : CollectorI, T, V> { + override val characteristics: Set = + me.characteristics.intersect(other.characteristics) + + override suspend fun supply(): Pair = + Pair(me.supply(), other.supply()) + + override suspend fun accumulate(current: Pair, value: T) { + me.accumulate(current.first, value) + other.accumulate(current.second, value) + } + + override suspend fun finish(current: Pair): V = + finish(me.finish(current.first), other.finish(current.second)) + } + } +} diff --git a/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collectors.kt b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collectors.kt new file mode 100644 index 00000000000..ae7a61bacfd --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collectors.kt @@ -0,0 +1,147 @@ +package arrow.collectors + +import arrow.atomic.Atomic +import arrow.atomic.AtomicInt +import arrow.atomic.update + +/** + * Library of [Collector]s. + */ +public object Collectors { + + /** + * Returns always the same value, regardless of the collected flow. + * + * @param value Value returns as result of the collection. + */ + public fun constant(value: R): Collector = + object : CollectorI { + override val characteristics: Set = Characteristics.IDENTITY_CONCURRENT_UNORDERED + override suspend fun supply(): R = value + override suspend fun accumulate(current: R, value: Any?) {} + override suspend fun finish(current: R): R = current + } + + /** + * Counts the number of elements in the flow. + */ + public val length: Collector = + object : CollectorI { + override val characteristics: Set = Characteristics.CONCURRENT_UNORDERED + override suspend fun supply(): AtomicInt = AtomicInt(0) + override suspend fun accumulate(current: AtomicInt, value: Any?) { + current.incrementAndGet() + } + + override suspend fun finish(current: AtomicInt): Int = current.get() + } + + /** + * Sum of all the values in the flow. + */ + public val sum: Collector = + intReducer({ 0 }, Int::plus) + + public fun intReducer( + initial: () -> Int, combine: (Int, Int) -> Int, + ): Collector = + object : CollectorI { + override val characteristics: Set = emptySet() + override suspend fun supply(): AtomicInt = AtomicInt(initial()) + override suspend fun accumulate(current: AtomicInt, value: Int) { + current.update { combine(it, value) } + } + + override suspend fun finish(current: AtomicInt): Int = current.get() + } + + public fun reducer( + initial: () -> M, combine: (M, M) -> M, unordered: Boolean = false, + ): Collector = + object : CollectorI, M, M> { + override val characteristics: Set = + if (unordered) Characteristics.CONCURRENT_UNORDERED + else emptySet() + + override suspend fun supply(): Atomic = Atomic(initial()) + override suspend fun accumulate(current: Atomic, value: M) { + current.update { combine(it, value) } + } + + override suspend fun finish(current: Atomic): M = current.get() + } + + /** + * Returns the "best" value from the value. + * + * @param selectNew Decides whether the new value is "better" than the previous best. + */ + public fun bestBy( + selectNew: (old: M, new: M) -> Boolean, + ): Collector = + object : CollectorI, M, M?> { + override val characteristics: Set = Characteristics.CONCURRENT_UNORDERED + override suspend fun supply(): Atomic = Atomic(null) + override suspend fun accumulate(current: Atomic, value: M) { + current.update { old -> + if (old == null) value + else if (selectNew(old, value)) value + else old + } + } + + override suspend fun finish(current: Atomic): M? = current.get() + } + + /** + * Collects all the values in a list, in the order in which they are emitted. + */ + public fun list(): Collector> = + object : CollectorI, T, List> { + override val characteristics: Set = setOf(Characteristics.IDENTITY_FINISH) + override suspend fun supply(): MutableList = mutableListOf() + override suspend fun accumulate(current: MutableList, value: T) { + current.add(value) + } + + override suspend fun finish(current: MutableList): List = current + } + + /** + * Collects all the values in a set. + */ + public fun set(): Collector> = + object : CollectorI, T, Set> { + override val characteristics: Set = + setOf(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED) + + override suspend fun supply(): MutableSet = mutableSetOf() + override suspend fun accumulate(current: MutableSet, value: T) { + current.add(value) + } + + override suspend fun finish(current: MutableSet): Set = current + } + + /** + * Collects all the values in a map. + */ + public fun mapFromEntries(): Collector, Map> = + map().contramap { (k, v) -> k to v } + + /** + * Collects all the values in a map. + */ + public fun map(): Collector, Map> = + object : CollectorI, Pair, Map> { + override val characteristics: Set = + setOf(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED) + + override suspend fun supply(): MutableMap = mutableMapOf() + override suspend fun accumulate(current: MutableMap, value: Pair) { + current[value.first] = value.second + } + + override suspend fun finish(current: MutableMap): Map = current + } +} diff --git a/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Flow.kt b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Flow.kt new file mode 100644 index 00000000000..bc28db3e5f4 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Flow.kt @@ -0,0 +1,104 @@ +package arrow.collectors + +import arrow.fx.coroutines.parMap +import arrow.fx.coroutines.parMapUnordered +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.DEFAULT_CONCURRENCY +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onStart +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.startCoroutine + +/** + * Performs collection over the elements of [this]. + * The amount of concurrency depends on the + * [Characteristics] of [collector], and can + * be tweaked using the [concurrency] parameter. + * + * [this] is iterated only once during collection. + * + * Note: if you need to perform changes on the values + * before collection, we strongly recommend to convert + * the [Iterable] into a [Flow] using [asFlow], + * perform those changes, and then using [collect]. + * + * @receiver Sequence of values to collect + * @param collector Describes how to collect the values + * @param concurrency Defines the concurrency limit + */ +@OptIn(FlowPreview::class) +public suspend fun Iterable.collect( + collector: Collector, + concurrency: Int = DEFAULT_CONCURRENCY, +): R = asFlow().collect(collector, concurrency) + +public fun Iterable.collectBlocking( + collector: Collector, +): R = asFlow().collectBlocking(collector) + +/** + * Performs collection over the elements of [this]. + * The amount of concurrency depends on the + * [Characteristics] of [collector], and can + * be tweaked using the [concurrency] parameter. + * + * [this] is consumed only once during collection. + * We recommend using a cold [Flow] to ensure that + * elements are produced only when needed. + * + * @receiver [Flow] of elements to collect + * @param collector Describes how to collect the values + * @param concurrency Defines the concurrency limit + */ +@OptIn(FlowPreview::class) +public suspend fun Flow.collect( + collector: Collector, + concurrency: Int = DEFAULT_CONCURRENCY, +): R = collectI(collector, concurrency) + +@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class) +@Suppress("UNINITIALIZED_VARIABLE", "UNCHECKED_CAST") +internal suspend fun Flow.collectI( + collector: CollectorI, + concurrency: Int = DEFAULT_CONCURRENCY, +): R { + var result: A + val started = this.onStart { result = collector.supply() } + val continued = when { + Characteristics.CONCURRENT in collector.characteristics -> when { + Characteristics.UNORDERED in collector.characteristics -> + started.parMapUnordered(concurrency) { collector.accumulate(result, it) } + + else -> + started.parMap(concurrency) { collector.accumulate(result, it) } + } + + else -> started.map { collector.accumulate(result, it) } + } + var completed: R + continued.onCompletion { + completed = when { + Characteristics.IDENTITY_FINISH in collector.characteristics -> result as R + else -> collector.finish(result) + } + }.collect { } + return completed +} + +@Suppress("UNINITIALIZED_VARIABLE") +public fun Flow.collectBlocking(collector: Collector): R { + var final: R + suspend { collect(collector) }.startCoroutine(object : Continuation { + override val context: CoroutineContext = EmptyCoroutineContext + override fun resumeWith(result: Result) { + final = result.getOrThrow() + } + }) + return final +} diff --git a/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Zip.kt b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Zip.kt new file mode 100644 index 00000000000..374e6bfca4e --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Zip.kt @@ -0,0 +1,31 @@ +package arrow.collectors + +/** + * Combines two [Collector]s by performing the phases + * of each of them in parallel. + * + * @param x First [Collector] + * @param y Second [Collector] + * @param combine Function that combines the end results + */ +public fun zip( + x: Collector, + y: Collector, + combine: (R, S) -> V, +): Collector = x.zip(y, combine) + +/** + * Combines three [Collector]s by performing the phases + * of each of them in parallel. + * + * @param x First [Collector] + * @param y Second [Collector] + * @param z Third [Collector] + * @param combine Function that combines the end results + */ +public fun zip( + x: Collector, + y: Collector, + z: Collector, + combine: (R, S, T) -> V, +): Collector = x.zip(y).zip(z) { (a, b), c -> combine(a, b, c) } diff --git a/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/CollectorsTest.kt b/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/CollectorsTest.kt new file mode 100644 index 00000000000..98a6614111a --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/CollectorsTest.kt @@ -0,0 +1,34 @@ +package arrow.collectors + +import io.kotest.matchers.shouldBe +import io.kotest.property.Arb +import io.kotest.property.PropertyContext +import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.list +import io.kotest.property.checkAll +import kotlinx.coroutines.test.TestResult +import kotlinx.coroutines.test.runTest +import kotlin.test.Test + +class CollectorsTest { + fun runTestOverLists( + block: suspend PropertyContext.(List) -> Unit + ): TestResult = runTest { + checkAll(Arb.list(Arb.int()), block) + } + + @Test + fun lengthWorks() = runTestOverLists { + it.collect(Collectors.length) shouldBe it.size + } + + @Test + fun sumWorks() = runTestOverLists { + it.collect(Collectors.sum) shouldBe it.sum() + } + + @Test + fun bestWorks() = runTestOverLists { + it.collect(Collectors.bestBy { old, new -> new > old }) shouldBe it.maxOrNull() + } +} diff --git a/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/Collectors.kt b/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/Collectors.kt new file mode 100644 index 00000000000..832c8d60877 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/Collectors.kt @@ -0,0 +1,51 @@ +package arrow.collectors + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentHashMap.KeySetView +import java.util.concurrent.ConcurrentMap + +/** + * Collects all the values in a map. + * + * This [Collector] supports concurrency, + * so it's potential faster than [Collectors.mapFromEntries]. + */ +@Suppress("UnusedReceiverParameter") +public fun Collectors.concurrentMapFromEntries(): Collector, ConcurrentMap> = + Collectors.concurrentMap().contramap { (k, v) -> k to v } + +/** + * Collects all the values in a map. + * + * This [Collector] supports concurrency, + * so it's potential faster than [Collectors.map]. + */ +@Suppress("UnusedReceiverParameter") +public fun Collectors.concurrentMap(): Collector, ConcurrentMap> = + object : CollectorI, Pair, ConcurrentMap> { + override val characteristics: Set = Characteristics.IDENTITY_CONCURRENT_UNORDERED + override suspend fun supply(): ConcurrentHashMap = ConcurrentHashMap() + override suspend fun accumulate(current: ConcurrentHashMap, value: Pair) { + current[value.first] = value.second + } + + override suspend fun finish(current: ConcurrentHashMap): ConcurrentMap = current + } + +/** + * Collects all the values in a set. + * + * This [Collector] supports concurrency, + * so it's potential faster than [Collectors.set]. + */ +@Suppress("UnusedReceiverParameter") +public fun Collectors.concurrentSet(): Collector> = + object : CollectorI, T, Set> { + override val characteristics: Set = Characteristics.IDENTITY_CONCURRENT_UNORDERED + override suspend fun supply(): KeySetView = ConcurrentHashMap().keySet(Unit) + override suspend fun accumulate(current: KeySetView, value: T) { + current.add(value) + } + + override suspend fun finish(current: KeySetView): Set = current + } diff --git a/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/JvmCollector.kt b/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/JvmCollector.kt new file mode 100644 index 00000000000..375213c9e4f --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/JvmCollector.kt @@ -0,0 +1,37 @@ +package arrow.collectors + +/** + * Wraps a [java.util.stream.Collector] to use with [collect]. + */ +public fun java.util.stream.Collector.asCollector(): Collector = + Collectors.jvm(this) + +/** + * Wraps a [java.util.stream.Collector] to use with [collect]. + */ +@Suppress("UnusedReceiverParameter") +public fun Collectors.jvm( + collector: java.util.stream.Collector, +): Collector = JvmCollector(collector) + +@JvmInline +private value class JvmCollector( + private val collector: java.util.stream.Collector, +) : CollectorI { + override val characteristics: Set + get() { + val original = collector.characteristics() + return setOfNotNull( + Characteristics.CONCURRENT.takeIf { java.util.stream.Collector.Characteristics.CONCURRENT in original }, + Characteristics.IDENTITY_FINISH.takeIf { java.util.stream.Collector.Characteristics.IDENTITY_FINISH in original }, + Characteristics.UNORDERED.takeIf { java.util.stream.Collector.Characteristics.UNORDERED in original }, + ) + } + + override suspend fun supply(): A = collector.supplier().get() + override suspend fun accumulate(current: A, value: T) { + collector.accumulator().accept(current, value) + } + + override suspend fun finish(current: A): R = collector.finisher().apply(current) +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 3957a87e1a0..3c9494210f1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -63,6 +63,9 @@ project(":arrow-fx-coroutines").projectDir = file("arrow-libs/fx/arrow-fx-corout include("arrow-fx-stm") project(":arrow-fx-stm").projectDir = file("arrow-libs/fx/arrow-fx-stm") +include("arrow-collectors") +project(":arrow-collectors").projectDir = file("arrow-libs/fx/arrow-collectors") + include("arrow-resilience") project(":arrow-resilience").projectDir = file("arrow-libs/resilience/arrow-resilience")