Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data: Schedule #733

Merged
merged 9 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2038,30 +2038,17 @@ import scala.concurrent.duration.*
val unreliableComputation: Int < Abort[Exception] =
Abort.catching[Exception](throw new Exception("Temporary failure"))

// Customize retry policy
val customPolicy = Retry.Policy.default
.limit(5)
.exponential(100.millis, maxBackoff = 5.seconds)
// Customize retry schedule
val shedule =
Schedule.exponentialBackoff(initial = 100.millis, factor = 2, maxBackoff = 5.seconds)
.take(5)

val a: Int < (Abort[Exception] & Async) =
Retry[Exception](customPolicy)(unreliableComputation)
Retry[Exception](shedule)(unreliableComputation)

// Use a custom policy builder
val b: Int < (Abort[Exception] & Async) =
Retry[Exception] { policy =>
policy
.limit(10)
.backoff(attempt => (attempt * 100).millis)
}(unreliableComputation)
```

The `Retry` effect automatically adds the `Async` effect to handle the backoff delays between retry attempts. The `Policy` class allows for fine-tuning of the retry behavior:

- `limit`: Sets the maximum number of retry attempts.
- `exponential`: Configures exponential backoff with a starting delay and optional maximum delay.
- `backoff`: Allows for custom backoff strategies based on the attempt number.

`Retry` will continue attempting the computation until it succeeds, the retry limit is reached, or an unhandled exception is thrown. If all retries fail, the last failure is propagated.
The `Retry` effect automatically adds the `Async` effect to handle the provided `Schedule`. `Retry` will continue attempting the computation until it succeeds, the retry schedule is done, or an unhandled exception is thrown. If all retries fail, the last failure is propagated.

### Queue: Concurrent Queuing

Expand Down
29 changes: 10 additions & 19 deletions kyo-combinators/shared/src/main/scala/kyo/Combinators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,13 @@ extension [A, S](effect: A < S)
* @return
* A computation that produces the result of the last execution
*/
def repeat(policy: Retry.Policy)(using Flat[A], Frame): A < (S & Async) =
Loop.indexed { i =>
if i >= policy.limit then effect.map(Loop.done)
else effect.delayed(policy.backoff(i)).as(Loop.continue)
def repeat(schedule: Schedule)(using Flat[A], Frame): A < (S & Async) =
Loop(schedule) { schedule =>
schedule.next.map { (delay, nextSchedule) =>
effect.delayed(delay).as(Loop.continue(nextSchedule))
}.getOrElse {
effect.map(Loop.done)
}
}

/** Performs this computation repeatedly with a limit.
Expand Down Expand Up @@ -186,8 +189,8 @@ extension [A, S](effect: A < S)
* @return
* A computation that produces the result of this computation with Async and Abort[Throwable] effects
*/
def retry(policy: Retry.Policy)(using Flat[A], Frame): A < (S & Async & Abort[Throwable]) =
Retry[Throwable](policy)(effect)
def retry(schedule: Schedule)(using Flat[A], Frame): A < (S & Async & Abort[Throwable]) =
Retry[Throwable](schedule)(effect)

/** Performs this computation repeatedly with a limit in case of failures.
*
Expand All @@ -197,19 +200,7 @@ extension [A, S](effect: A < S)
* A computation that produces the result of this computation with Async and Abort[Throwable] effects
*/
def retry(n: Int)(using Flat[A], Frame): A < (S & Async & Abort[Throwable]) =
Retry[Throwable](Retry.Policy(_ => Duration.Zero, n))(effect)

/** Performs this computation repeatedly with a backoff policy and a limit in case of failures.
*
* @param backoff
* The backoff policy to use
* @param limit
* The limit to use
* @return
* A computation that produces the result of this computation with Async and Abort[Throwable] effects
*/
def retry(backoff: Int => Duration, n: Int)(using Flat[A], Frame): A < (S & Async & Abort[Throwable]) =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the "raw" method to reduce API surface

Retry[Throwable](Retry.Policy(backoff, n))(effect)
Retry[Throwable](Schedule.repeat(n))(effect)

/** Performs this computation indefinitely.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ class EffectCombinatorTest extends Test:

"repeat with policy" - {
"repeat with custom policy" in run {
var count = 0
val policy = Retry.Policy(_ => Duration.Zero, 3)
val effect = IO { count += 1; count }.repeat(policy)
var count = 0
val schedule = Schedule.repeat(3)
val effect = IO { count += 1; count }.repeat(schedule)
Async.run(effect).map(_.toFuture).map { handled =>
handled.map { v =>
assert(v == 4)
Expand Down Expand Up @@ -321,7 +321,7 @@ class EffectCombinatorTest extends Test:
"retry with policy" - {
"successful after retries with custom policy" in run {
var count = 0
val policy = Retry.Policy(_ => 10.millis, 3)
val policy = Schedule.fixed(10.millis).take(3)
val effect = IO {
count += 1
if count < 3 then throw new Exception("Retry")
Expand All @@ -333,20 +333,6 @@ class EffectCombinatorTest extends Test:
}
}

"retry with backoff and limit" - {
"successful after retries with exponential backoff" in run {
var count = 0
val backoff = (i: Int) => Math.pow(2, i).toLong.millis
val effect = IO {
count += 1
if count < 3 then throw new Exception("Retry")
else count
}.retry(backoff, 3)
Async.run(effect).map(_.toFuture).map { handled =>
handled.map(v => assert(v == 3))
}
}
}
}

"explicitThrowable" - {
Expand Down
80 changes: 8 additions & 72 deletions kyo-core/shared/src/main/scala/kyo/Retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,74 +6,12 @@ import scala.util.*
/** Provides utilities for retrying operations with customizable policies. */
object Retry:

/** Represents a retry policy with backoff strategy and attempt limit. */
final case class Policy(backoff: Int => Duration, limit: Int):

/** Creates an exponential backoff strategy.
*
* @param startBackoff
* The initial backoff duration.
* @param factor
* The multiplier for each subsequent backoff.
* @param maxBackoff
* The maximum backoff duration.
* @return
* A new Policy with exponential backoff.
*/
def exponential(
startBackoff: Duration,
factor: Int = 2,
maxBackoff: Duration = Duration.Infinity
): Policy =
backoff { i =>
(startBackoff * factor * (i + 1)).min(maxBackoff)
}

/** Sets a custom backoff function.
*
* @param f
* A function that takes the attempt number and returns a Duration.
* @return
* A new Policy with the custom backoff function.
*/
def backoff(f: Int => Duration): Policy =
copy(backoff = f)

/** Sets the maximum number of retry attempts.
*
* @param v
* The maximum number of attempts.
* @return
* A new Policy with the specified attempt limit.
*/
def limit(v: Int): Policy =
copy(limit = v)
end Policy

object Policy:
/** The default retry policy with no backoff and 3 attempts. */
val default = Policy(_ => Duration.Zero, 3)
/** The default retry schedule. */
val defaultSchedule = Schedule.exponentialBackoff(initial = 100.millis, factor = 2, maxBackoff = 5.seconds).take(3)

/** Provides retry operations for a specific error type. */
final class RetryOps[E >: Nothing](dummy: Unit) extends AnyVal:

/** Retries an operation using the specified policy.
*
* @param policy
* The retry policy to use.
* @param v
* The operation to retry.
* @return
* The result of the operation, or an abort if all retries fail.
*/
def apply[A: Flat, S](policy: Policy)(v: => A < S)(
using
SafeClassTag[E],
Tag[E],
Frame
): A < (Async & Abort[E] & S) =
apply(_ => policy)(v)

/** Retries an operation using a custom policy builder.
*
* @param builder
Expand All @@ -83,21 +21,19 @@ object Retry:
* @return
* The result of the operation, or an abort if all retries fail.
*/
def apply[A: Flat, S](builder: Policy => Policy)(v: => A < (Abort[E] & S))(
def apply[A: Flat, S](schedule: Schedule)(v: => A < (Abort[E] & S))(
using
SafeClassTag[E],
Tag[E],
Frame
): A < (Async & Abort[E] & S) =
val b = builder(Policy.default)
Loop.indexed { attempt =>
Loop(schedule) { schedule =>
Abort.run[E](v).map(_.fold { r =>
if attempt < b.limit then
Async.sleep(b.backoff(attempt)).andThen {
Loop.continue
}
else
schedule.next.map { (delay, nextSchedule) =>
Async.delay(delay)(Loop.continue(nextSchedule))
}.getOrElse {
Abort.get(r)
}
}(Loop.done(_)))
}
end apply
Expand Down
10 changes: 5 additions & 5 deletions kyo-core/shared/src/test/scala/kyo/RetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class RetryTest extends Test:
"no retries" - {
"ok" in run {
var calls = 0
Retry[Any](_.limit(0)) {
Retry[Any](Schedule.never) {
calls += 1
42
}.map { v =>
Expand All @@ -17,7 +17,7 @@ class RetryTest extends Test:
"nok" in run {
var calls = 0
Abort.run[Exception] {
Retry[Exception](_.limit(0)) {
Retry[Exception](Schedule.never) {
calls += 1
throw ex
}
Expand All @@ -30,7 +30,7 @@ class RetryTest extends Test:
"retries" - {
"ok" in run {
var calls = 0
Retry[Any](_.limit(3)) {
Retry[Any](Schedule.repeat(3)) {
calls += 1
42
}.map { v =>
Expand All @@ -40,7 +40,7 @@ class RetryTest extends Test:
"nok" in run {
var calls = 0
Abort.run[Exception] {
Retry[Exception](_.limit(3)) {
Retry[Exception](Schedule.repeat(3)) {
calls += 1
throw ex
}
Expand All @@ -54,7 +54,7 @@ class RetryTest extends Test:
var calls = 0
val start = java.lang.System.currentTimeMillis()
Abort.run[Exception] {
Retry[Exception](_.limit(4).exponential(1.milli)) {
Retry[Exception](Schedule.exponentialBackoff(1.milli, 2.0, Duration.Infinity).take(4)) {
calls += 1
throw ex
}
Expand Down
4 changes: 4 additions & 0 deletions kyo-data/shared/src/main/scala/kyo/Duration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ object Duration:
val sum: Long = self.toLong + that.toLong
if sum >= 0 then sum else Duration.Infinity

inline infix def -(that: Duration): Duration =
val diff: Long = self.toLong - that.toLong
if diff > 0 then diff else Duration.Zero

inline infix def *(factor: Double): Duration =
if factor <= 0 || self.toLong <= 0L then Duration.Zero
else if factor <= Long.MaxValue / self.toLong.toDouble then Math.round(self.toLong.toDouble * factor)
Expand Down
18 changes: 18 additions & 0 deletions kyo-data/shared/src/main/scala/kyo/Instant.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ object Instant:
def truncatedTo(unit: Duration.Units & Duration.Truncatable): Instant =
instant.truncatedTo(unit.chronoUnit)

/** Returns the minimum of this Instant and another.
*
* @param other
* The other Instant to compare with.
* @return
* The earlier of the two Instants.
*/
infix def min(other: Instant): Instant = if instant.isBefore(other) then instant else other

/** Returns the maximum of this Instant and another.
*
* @param other
* The other Instant to compare with.
* @return
* The later of the two Instants.
*/
infix def max(other: Instant): Instant = if instant.isAfter(other) then instant else other

/** Converts this Instant to a human-readable ISO-8601 formatted string.
*
* @return
Expand Down
Loading
Loading