Skip to content

Commit

Permalink
Start work on arrow-collectors
Browse files Browse the repository at this point in the history
  • Loading branch information
serras committed Nov 3, 2023
1 parent 1a2927b commit b91dc0f
Show file tree
Hide file tree
Showing 10 changed files with 601 additions and 0 deletions.
43 changes: 43 additions & 0 deletions arrow-libs/fx/arrow-collectors/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
@file:Suppress("DSL_SCOPE_VIOLATION")

plugins {
id(libs.plugins.kotlin.multiplatform.get().pluginId)
alias(libs.plugins.arrowGradleConfig.kotlin)
alias(libs.plugins.arrowGradleConfig.publish)
}

apply(from = property("ANIMALSNIFFER_MPP"))

kotlin {
sourceSets {
commonMain {
dependencies {
api(projects.arrowFxCoroutines)
api(projects.arrowAtomic)
api(libs.coroutines.core)
implementation(libs.kotlin.stdlibCommon)
}
}

commonTest {
dependencies {
implementation(libs.kotlin.test)
implementation(libs.kotest.frameworkEngine)
implementation(libs.kotest.assertionsCore)
implementation(libs.kotest.property)
}
}
}

jvm {
tasks.jvmJar {
manifest {
attributes["Automatic-Module-Name"] = "arrow.collectors"
}
}
}
}

tasks.withType<Test> {
useJUnitPlatform()
}
4 changes: 4 additions & 0 deletions arrow-libs/fx/arrow-collectors/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Maven publishing configuration
pom.name=Arrow Collectors
# Build configuration
kapt.incremental.apt=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package arrow.collectors

// https://hackage.haskell.org/package/foldl-1.4.15/docs/Control-Foldl.html

/**
* Defines how the [collect] function may run the collector.
*/
public enum class Characteristics {
/**
* The flow may be collected using several workers.
*/
CONCURRENT,

/**
* The final step of the collector simply returns the accumulator.
* That means that final call may be replaced by a cast.
*/
IDENTITY_FINISH,

/**
* The order in which elements are accumulated doesn't matter for the end result.
*/
UNORDERED;

public companion object {
public val CONCURRENT_UNORDERED: Set<Characteristics> = setOf(CONCURRENT, UNORDERED)
public val IDENTITY_CONCURRENT: Set<Characteristics> = setOf(IDENTITY_FINISH, CONCURRENT)
public val IDENTITY_CONCURRENT_UNORDERED: Set<Characteristics> = setOf(IDENTITY_FINISH, CONCURRENT, UNORDERED)
}
}


public typealias Collector<T, R> = CollectorI<*, T, R>

/**
* A [Collector] accumulates information from elements
* coming from some data source, usually a [kotlinx.coroutines.flow.Flow] or [Iterable].
*
* The accumulation is done in three phases:
* - Initialization of some (mutable) accumulator ([supply])
* - Updating the accumulator with each value ([accumulate])
* - Finalize the work, and extract the final result ([finish])
*
* This interface is heavily influenced by
* Java's [`Collector`](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html)
* and Haskell's [`foldl` library](https://hackage.haskell.org/package/foldl/docs/Control-Foldl.html).
*/
public interface CollectorI<A, in T, out R> {
public val characteristics: Set<Characteristics>
public suspend fun supply(): A
public suspend fun accumulate(current: A, value: T)
public suspend fun finish(current: A): R

/**
* Performs additional work during the finalization phase,
* by applying a function to the end result.
*
* @param finish Additional function to apply to the end result.
*/
// functor over R
public fun <S> map(
finish: suspend (R) -> S,
): Collector<T, S> {
val me = this
return object : CollectorI<A, T, S> {
override val characteristics: Set<Characteristics> =
me.characteristics

override suspend fun supply(): A =
me.supply()

override suspend fun accumulate(current: A, value: T) =
me.accumulate(current, value)

override suspend fun finish(current: A): S =
finish(me.finish(current))
}
}

/**
* Applies a function over each element in the data source,
* before giving it to the collector.
*
* @param transform Function to apply to each value
*/
// contravariant functor over T
public fun <P> contramap(
transform: suspend (P) -> T,
): Collector<P, R> {
val me = this
return object : CollectorI<A, P, R> {
override val characteristics: Set<Characteristics> =
me.characteristics

override suspend fun supply(): A =
me.supply()

override suspend fun accumulate(current: A, value: P) =
me.accumulate(current, transform(value))

override suspend fun finish(current: A): R =
me.finish(current)
}
}

/**
* Combines two [Collector]s by performing the phases
* of each of them in parallel.
*
* @param other [Collector] to combine with [this]
*/
// applicative
public fun <B, S> zip(
other: CollectorI<B, @UnsafeVariance T, S>,
): Collector<T, Pair<R, S>> =
this.zip(other, ::Pair)

/**
* Combines two [Collector]s by performing the phases
* of each of them in parallel, and then combining
* the end result in a final step.
*
* @param other [Collector] to combine with [this]
* @param finish Function that combines the end results
*/
public fun <B, S, V> zip(
other: CollectorI<B, @UnsafeVariance T, S>,
finish: (R, S) -> V,
): Collector<T, V> {
val me = this
return object : CollectorI<Pair<A, B>, T, V> {
override val characteristics: Set<Characteristics> =
me.characteristics.intersect(other.characteristics)

override suspend fun supply(): Pair<A, B> =
Pair(me.supply(), other.supply())

override suspend fun accumulate(current: Pair<A, B>, value: T) {
me.accumulate(current.first, value)
other.accumulate(current.second, value)
}

override suspend fun finish(current: Pair<A, B>): V =
finish(me.finish(current.first), other.finish(current.second))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package arrow.collectors

import arrow.atomic.Atomic
import arrow.atomic.AtomicInt
import arrow.atomic.update

/**
* Library of [Collector]s.
*/
public object Collectors {

/**
* Returns always the same value, regardless of the collected flow.
*
* @param value Value returns as result of the collection.
*/
public fun <R> constant(value: R): Collector<Any?, R> =
object : CollectorI<R, Any?, R> {
override val characteristics: Set<Characteristics> = Characteristics.IDENTITY_CONCURRENT_UNORDERED
override suspend fun supply(): R = value
override suspend fun accumulate(current: R, value: Any?) {}
override suspend fun finish(current: R): R = current
}

/**
* Counts the number of elements in the flow.
*/
public val length: Collector<Any?, Int> =
object : CollectorI<AtomicInt, Any?, Int> {
override val characteristics: Set<Characteristics> = Characteristics.CONCURRENT_UNORDERED
override suspend fun supply(): AtomicInt = AtomicInt(0)
override suspend fun accumulate(current: AtomicInt, value: Any?) {
current.incrementAndGet()
}

override suspend fun finish(current: AtomicInt): Int = current.get()
}

/**
* Sum of all the values in the flow.
*/
public val sum: Collector<Int, Int> =
intReducer({ 0 }, Int::plus)

public fun intReducer(
initial: () -> Int, combine: (Int, Int) -> Int,
): Collector<Int, Int> =
object : CollectorI<AtomicInt, Int, Int> {
override val characteristics: Set<Characteristics> = emptySet()
override suspend fun supply(): AtomicInt = AtomicInt(initial())
override suspend fun accumulate(current: AtomicInt, value: Int) {
current.update { combine(it, value) }
}

override suspend fun finish(current: AtomicInt): Int = current.get()
}

public fun <M> reducer(
initial: () -> M, combine: (M, M) -> M, unordered: Boolean = false,
): Collector<M, M> =
object : CollectorI<Atomic<M>, M, M> {
override val characteristics: Set<Characteristics> =
if (unordered) Characteristics.CONCURRENT_UNORDERED
else emptySet()

override suspend fun supply(): Atomic<M> = Atomic(initial())
override suspend fun accumulate(current: Atomic<M>, value: M) {
current.update { combine(it, value) }
}

override suspend fun finish(current: Atomic<M>): M = current.get()
}

/**
* Returns the "best" value from the value.
*
* @param selectNew Decides whether the new value is "better" than the previous best.
*/
public fun <M> bestBy(
selectNew: (old: M, new: M) -> Boolean,
): Collector<M, M?> =
object : CollectorI<Atomic<M?>, M, M?> {
override val characteristics: Set<Characteristics> = Characteristics.CONCURRENT_UNORDERED
override suspend fun supply(): Atomic<M?> = Atomic(null)
override suspend fun accumulate(current: Atomic<M?>, value: M) {
current.update { old ->
if (old == null) value
else if (selectNew(old, value)) value
else old
}
}

override suspend fun finish(current: Atomic<M?>): M? = current.get()
}

/**
* Collects all the values in a list, in the order in which they are emitted.
*/
public fun <T> list(): Collector<T, List<T>> =
object : CollectorI<MutableList<T>, T, List<T>> {
override val characteristics: Set<Characteristics> = setOf(Characteristics.IDENTITY_FINISH)
override suspend fun supply(): MutableList<T> = mutableListOf()
override suspend fun accumulate(current: MutableList<T>, value: T) {
current.add(value)
}

override suspend fun finish(current: MutableList<T>): List<T> = current
}

/**
* Collects all the values in a set.
*/
public fun <T> set(): Collector<T, Set<T>> =
object : CollectorI<MutableSet<T>, T, Set<T>> {
override val characteristics: Set<Characteristics> =
setOf(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED)

override suspend fun supply(): MutableSet<T> = mutableSetOf()
override suspend fun accumulate(current: MutableSet<T>, value: T) {
current.add(value)
}

override suspend fun finish(current: MutableSet<T>): Set<T> = current
}

/**
* Collects all the values in a map.
*/
public fun <K, V> mapFromEntries(): Collector<Map.Entry<K, V>, Map<K, V>> =
map<K, V>().contramap { (k, v) -> k to v }

/**
* Collects all the values in a map.
*/
public fun <K, V> map(): Collector<Pair<K, V>, Map<K, V>> =
object : CollectorI<MutableMap<K, V>, Pair<K, V>, Map<K, V>> {
override val characteristics: Set<Characteristics> =
setOf(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED)

override suspend fun supply(): MutableMap<K, V> = mutableMapOf()
override suspend fun accumulate(current: MutableMap<K, V>, value: Pair<K, V>) {
current[value.first] = value.second
}

override suspend fun finish(current: MutableMap<K, V>): Map<K, V> = current
}
}
Loading

0 comments on commit b91dc0f

Please sign in to comment.