Skip to content

Commit

Permalink
Simplify ResourceScope interface to a single method.
Browse files Browse the repository at this point in the history
Make resourceScope and use inline.
  • Loading branch information
kyay10 committed Nov 1, 2024
1 parent c6cc54d commit 42537c4
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package arrow

import arrow.atomic.Atomic
import arrow.atomic.update
import kotlin.coroutines.cancellation.CancellationException
import arrow.atomic.value

/**
* The [AutoCloseScope] DSL allows for elegantly working with close-ables,
Expand Down Expand Up @@ -66,10 +66,6 @@ public inline fun <A> autoCloseScope(block: AutoCloseScope.() -> A): A {
var finalized = false
return try {
block(scope)
} catch (e: CancellationException) {
finalized = true
scope.close(e)
throw e
} catch (e: Throwable) {
finalized = true
scope.close(e.throwIfFatal())
Expand All @@ -85,7 +81,7 @@ public interface AutoCloseScope {
public fun onClose(release: (Throwable?) -> Unit)

public fun <A : AutoCloseable> install(autoCloseable: A): A =
autoClose({ autoCloseable }) { a, _ -> a.close() }
autoCloseable.also { onClose { autoCloseable.close() } }
}

public inline fun <A> AutoCloseScope.autoClose(
Expand All @@ -102,8 +98,8 @@ internal class DefaultAutoCloseScope : AutoCloseScope {
}

fun close(error: Throwable?) {
finalizers.get().asReversed().fold(error) { acc, function ->
acc.add(runCatching { function.invoke(error) }.exceptionOrNull())
finalizers.value.asReversed().fold(error) { acc, finalizer ->
acc.add(runCatching { finalizer(error) }.exceptionOrNull())
}?.let { throw it }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package arrow

import kotlin.coroutines.cancellation.CancellationException

@PublishedApi
internal actual fun Throwable.throwIfFatal(): Throwable =
when(this) {
is VirtualMachineError, is ThreadDeath, is InterruptedException, is LinkageError, is CancellationException -> throw this
is VirtualMachineError, is ThreadDeath, is InterruptedException, is LinkageError -> throw this
else -> this
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ public sealed class ExitCase {
}
}

internal val ExitCase.errorOrNull
get() = when (this) {
ExitCase.Completed -> null
is ExitCase.Cancelled -> exception
is ExitCase.Failure -> failure
}

/**
* Registers an [onCancel] handler after [fa].
* [onCancel] is guaranteed to be called in case of cancellation, otherwise it's ignored.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package arrow.fx.coroutines

import arrow.AutoCloseScope
import arrow.atomic.update
import arrow.atomic.Atomic
import arrow.atomic.update
import arrow.atomic.value
import arrow.core.identity
import arrow.core.prependTo
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext
import kotlin.jvm.JvmInline

@DslMarker
public annotation class ScopeDSL
Expand Down Expand Up @@ -176,6 +173,7 @@ public annotation class ResourceDSL
* <!--- INCLUDE
* import arrow.fx.coroutines.ResourceScope
* import arrow.fx.coroutines.Resource
* import arrow.fx.coroutines.install
* import arrow.fx.coroutines.resource
* import kotlinx.coroutines.Dispatchers
* import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -227,6 +225,7 @@ public annotation class ResourceDSL
*
* <!--- INCLUDE
* import arrow.fx.coroutines.ResourceScope
* import arrow.fx.coroutines.install
* import arrow.fx.coroutines.resourceScope
* import arrow.fx.coroutines.parZip
* import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -293,36 +292,35 @@ public interface ResourceScope : AutoCloseScope {
* All [release] functions [install]ed into the [Resource] lambda will be installed in this [ResourceScope] while respecting the FIFO order.
*/
@ResourceDSL
public suspend fun <A> Resource<A>.bind(): A
/**
* Install [A] into the [ResourceScope].
* Its [release] function will be called with the appropriate [ExitCase] if this [ResourceScope] finishes.
* It results either in [ExitCase.Completed], [ExitCase.Cancelled] or [ExitCase.Failure] depending on the terminal state of [Resource] lambda.
*/
@ResourceDSL
public suspend fun <A> install(
acquire: suspend AcquireStep.() -> A,
release: suspend (A, ExitCase) -> Unit,
): A
public suspend fun <A> Resource<A>.bind(): A = this()

/** Composes a [release] action to a [Resource] value before binding. */
@ResourceDSL
public suspend infix fun <A> Resource<A>.release(release: suspend (A) -> Unit): A {
val a = bind()
return install({ a }) { aa, _ -> release(aa) }
}
public suspend infix fun <A> Resource<A>.release(release: suspend (A) -> Unit): A =
bind().also { a -> onRelease { release(a) } }

/** Composes a [releaseCase] action to a [Resource] value before binding. */
@ResourceDSL
public suspend infix fun <A> Resource<A>.releaseCase(release: suspend (A, ExitCase) -> Unit): A {
val a = bind()
return install({ a }, release)
}
public suspend infix fun <A> Resource<A>.releaseCase(release: suspend (A, ExitCase) -> Unit): A =
bind().also { a -> onRelease { release(a, it) } }

override fun onClose(release: (Throwable?) -> Unit): Unit = onRelease { release(it.errorOrNull) }

public suspend infix fun onRelease(release: suspend (ExitCase) -> Unit): Unit =
install({ }) { _, exitCase -> release(exitCase) }
public infix fun onRelease(release: suspend (ExitCase) -> Unit)
}


/**
* Install [A] into the [ResourceScope].
* Its [release] function will be called with the appropriate [ExitCase] if this [ResourceScope] finishes.
* It results either in [ExitCase.Completed], [ExitCase.Cancelled] or [ExitCase.Failure] depending on the terminal state of [Resource] lambda.
*/
@ResourceDSL
public inline fun <A> ResourceScope.install(
acquire: AcquireStep.() -> A,
crossinline release: suspend (A, ExitCase) -> Unit,
): A = acquire(AcquireStep).also { a -> onRelease { release(a, it) } }

@ScopeDSL
public fun <A> resource(block: suspend ResourceScope.() -> A): Resource<A> = block

Expand All @@ -332,6 +330,7 @@ public fun <A> resource(block: suspend ResourceScope.() -> A): Resource<A> = blo
* upon [ExitCase.Completed], [ExitCase.Cancelled] or [ExitCase.Failure] runs all the `release` finalizers.
*
* <!--- INCLUDE
* import arrow.fx.coroutines.install
* import arrow.fx.coroutines.resourceScope
* import kotlinx.coroutines.Dispatchers
* import kotlinx.coroutines.withContext
Expand All @@ -354,30 +353,30 @@ public fun <A> resource(block: suspend ResourceScope.() -> A): Resource<A> = blo
* <!--- KNIT example-resource-06.kt -->
*/
@ScopeDSL
public suspend fun <A> resourceScope(action: suspend ResourceScope.() -> A): A {
public suspend inline fun <A> resourceScope(action: suspend ResourceScope.() -> A): A {
val scope = ResourceScopeImpl()
val a: A = try {
var finished = false
return try {
action(scope)
} catch (e: Throwable) {
val ex = if (e is CancellationException) ExitCase.Cancelled(e) else ExitCase.Failure(e)
val ee = withContext(NonCancellable) {
scope.cancelAll(ex, e) ?: e
finished = true
scope.cancelAll(ExitCase.ExitCase(e))
throw e
} finally {
if (!finished) {
scope.cancelAll(ExitCase.Completed)
}
throw ee
}
withContext(NonCancellable) {
scope.cancelAll(ExitCase.Completed)?.let { throw it }
}
return a
}

public suspend infix fun <A, B> Resource<A>.use(f: suspend (A) -> B): B =
public suspend inline infix fun <A, B> Resource<A>.use(f: suspend (A) -> B): B =
resourceScope { f(bind()) }

/**
* Construct a [Resource] from an allocating function [acquire] and a release function [release].
*
* ```kotlin
* import arrow.fx.coroutines.install
* import arrow.fx.coroutines.resource
* import arrow.fx.coroutines.resourceScope
*
Expand Down Expand Up @@ -469,72 +468,33 @@ public fun <A> Resource<A>.asFlow(): Flow<A> =
* This API is useful for building inter-op APIs between [Resource] and non-suspending code, such as Java libraries.
*/
@DelicateCoroutinesApi
public suspend fun <A> Resource<A>.allocated(): Pair<A, suspend (ExitCase) -> Unit> {
val effect = ResourceScopeImpl()
val allocated: A = invoke(effect)
val release: suspend (ExitCase) -> Unit = { e ->
val suppressed: Throwable? = effect.cancelAll(e)
val original: Throwable? = when(e) {
ExitCase.Completed -> null
is ExitCase.Cancelled -> e.exception
is ExitCase.Failure -> e.failure
}
original?.apply {
suppressed?.let(::addSuppressed)
}?.let { throw it }
}
return Pair(allocated, release)
public suspend fun <A> Resource<A>.allocated(): Pair<A, suspend (ExitCase) -> Unit> = with(ResourceScopeImpl()) {
bind() to this::cancelAll
}

@JvmInline
private value class ResourceScopeImpl(
private val finalizers: Atomic<List<suspend (ExitCase) -> Unit>> = Atomic(emptyList()),
) : ResourceScope {
override suspend fun <A> Resource<A>.bind(): A = invoke(this@ResourceScopeImpl)

override suspend fun <A> install(acquire: suspend AcquireStep.() -> A, release: suspend (A, ExitCase) -> Unit): A =
bracketCase({
val a = acquire(AcquireStep)
val finalizer: suspend (ExitCase) -> Unit = { ex: ExitCase -> release(a, ex) }
finalizers.update(finalizer::prependTo)
a
}, ::identity, { a, ex ->
// Only if ExitCase.Failure, or ExitCase.Cancelled during acquire we cancel
// Otherwise we've saved the finalizer, and it will be called from somewhere else.
if (ex != ExitCase.Completed) {
val e = cancelAll(ex)
val e2 = kotlin.runCatching { release(a, ex) }.exceptionOrNull()
e?.apply {
e2?.let(::addSuppressed)
}?.let { throw it }
}
})

override fun onClose(release: (Throwable?) -> Unit) {
val finalizer: suspend (ExitCase) -> Unit = { exitCase ->
release(exitCase.errorOrNull)
}
finalizers.update(finalizer::prependTo)
@PublishedApi
internal class ResourceScopeImpl : ResourceScope {
private val finalizers: Atomic<List<suspend (ExitCase) -> Unit>> = Atomic(emptyList())
override fun onRelease(release: suspend (ExitCase) -> Unit) {
finalizers.update(release::prependTo)
}

private val ExitCase.errorOrNull get() = when (this) {
ExitCase.Completed -> null
is ExitCase.Cancelled -> exception
is ExitCase.Failure -> failure
suspend fun cancelAll(exitCase: ExitCase) {
withContext(NonCancellable) {
finalizers.value.fold(exitCase.errorOrNull) { acc, finalizer ->
acc.add(runCatching { finalizer(exitCase) }.exceptionOrNull())
}
}?.let { throw it }
}

suspend fun cancelAll(
exitCase: ExitCase,
first: Throwable? = null,
): Throwable? = finalizers.value.fold(first) { acc, finalizer ->
val other = kotlin.runCatching { finalizer(exitCase) }.exceptionOrNull()
other?.let {
acc?.apply { addSuppressed(other) } ?: other
} ?: acc
}
private fun Throwable?.add(other: Throwable?): Throwable? =
this?.apply {
other?.let { addSuppressed(it) }
} ?: other
}

/** Platform-dependent IO [CoroutineDispatcher] **/
@PublishedApi
internal expect val IODispatcher: CoroutineDispatcher

/**
Expand All @@ -555,14 +515,14 @@ internal expect val IODispatcher: CoroutineDispatcher
* <!--- KNIT example-resource-10.kt -->
*/
@ResourceDSL
public suspend fun <A : AutoCloseable> ResourceScope.autoCloseable(
public inline fun <A : AutoCloseable> ResourceScope.autoCloseable(
closingDispatcher: CoroutineDispatcher = IODispatcher,
autoCloseable: suspend () -> A,
): A = install({ autoCloseable() } ) { s: A, _: ExitCase -> withContext(closingDispatcher) { s.close() } }
autoCloseable: () -> A,
): A = autoCloseable().also { s -> onRelease { withContext(closingDispatcher) { s.close() } } }

public fun <A : AutoCloseable> autoCloseable(
closingDispatcher: CoroutineDispatcher = IODispatcher,
autoCloseable: suspend () -> A,
): Resource<A> = resource {
autoCloseable(closingDispatcher, autoCloseable)
autoCloseable(closingDispatcher) { autoCloseable() }
}
Loading

0 comments on commit 42537c4

Please sign in to comment.