Skip to content

Commit

Permalink
rewrote Cmd.batchedThrottle to Dispatch.batchThrottled extension
Browse files Browse the repository at this point in the history
because it feels more natural to use it that way with a dispatch inside an ofEffect that produces values rapidly
  • Loading branch information
h0lg committed Sep 10, 2024
1 parent 0cc2a6a commit 63f739a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 71 deletions.
32 changes: 16 additions & 16 deletions src/Fabulous.Tests/CmdTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type ``Cmd tests``() =
}

[<Test>]
member _.``Cmd.batchedThrottle dispatches all undispatched values on interval expiry``() =
member _.``Dispatch.batchThrottled dispatches all undispatched values on interval expiry``() =
async {
let mutable messageCount = 0
let mutable dispatched = [] // records dispatched messages latest first
Expand All @@ -162,12 +162,12 @@ type ``Cmd tests``() =
messageCount <- messageCount + 1
dispatched <- msg :: dispatched

let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues
let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues)

batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 1
batchedThrottleCmd 2
batchedThrottleCmd 3
batchedThrottleCmd 4

do! Async.Sleep 200 // Wait longer than the throttle interval

Expand All @@ -177,7 +177,7 @@ type ``Cmd tests``() =
}

[<Test>]
member _.``Cmd.batchedThrottle dispatches messages immediately if interval not expired``() =
member _.``Dispatch.batchThrottled dispatches messages immediately if interval not expired``() =
async {
let mutable messageCount = 0
let mutable dispatched = [] // records dispatched messages latest first
Expand All @@ -186,10 +186,10 @@ type ``Cmd tests``() =
messageCount <- messageCount + 1
dispatched <- msg :: dispatched

let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues
let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues)

batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 1
batchedThrottleCmd 2

// Only the first value should have been dispatched immediately
Assert.AreEqual(1, messageCount)
Expand All @@ -199,8 +199,8 @@ type ``Cmd tests``() =
giving second value time to dispatch and elapsing time until next dispatch *)
do! Async.Sleep 210

batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 3
batchedThrottleCmd 4

// Second value should have dispatched delayed, third immediately
Assert.AreEqual(3, messageCount)
Expand All @@ -214,7 +214,7 @@ type ``Cmd tests``() =
}

[<Test>]
member _.``Cmd.batchedThrottle factory can be awaited for completion``() =
member _.``Dispatch.batchThrottled factory can be awaited for completion``() =
async {
let mutable messageCount = 0
let mutable dispatched = [] // records dispatched messages latest first
Expand All @@ -223,10 +223,10 @@ type ``Cmd tests``() =
messageCount <- messageCount + 1
dispatched <- msg :: dispatched

let createCmd, awaitNextDispatch = Cmd.batchedThrottle 100 NewValues
let createCmd, awaitNextDispatch = dispatch.batchThrottled(100, NewValues)

createCmd 1 |> CmdTestsHelper.execute dispatch
createCmd 2 |> CmdTestsHelper.execute dispatch
createCmd 1
createCmd 2

// Only the first value should have been dispatched immediately
Assert.AreEqual(1, messageCount)
Expand Down
111 changes: 56 additions & 55 deletions src/Fabulous/Cmd.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Fabulous

open System.Runtime.CompilerServices
open System.Threading
open System.Threading.Tasks

Expand Down Expand Up @@ -312,27 +313,28 @@ module Cmd =
cts.Token
)) ]

type DispatchExtensions =

/// <summary>
/// Creates a factory for Commands that dispatch messages with a list of pending values at a fixed maximum rate,
/// ensuring that all pending values are dispatched when the specified interval elapses.
/// This function is similar to <see cref="bufferedThrottle"/>, but instead of dispatching only the last value,
/// it remembers and dispatches all undispatched values within the specified interval.
/// Helpful for scenarios where you want to throttle messages but cannot afford to lose any of the values they carry,
/// ensuring all values are processed at a controlled rate.
/// Creates a throttled dispatch factory that dispatches values in batches at a fixed minimum interval/maximum rate
/// while ensuring that all values are dispatched eventually.
/// This helps throttle the message dispatch of a rapid producer to avoid overloading the MVU loop
/// without dropping any of the carried values - ensuring all values are processed in batches at a controlled rate.
/// Note that this function creates an object with internal state and is intended to be used per Program
/// or longer-running background process rather than once per message in the update function.
/// </summary>
/// <param name="interval">The minimum time interval between two consecutive Command executions in milliseconds.</param>
/// <param name="fn">A function that maps a list of factory input values to a message for dispatch.</param>
/// <param name="interval">The minimum time interval between two consecutive dispatches in milliseconds.</param>
/// <param name="mapBatchToMsg">A function that maps a list of pending input values to a message for dispatch.</param>
/// <returns>
/// Two methods - the first being a Command factory function that maps a list of input values to a Command
/// which dispatches a message (mapped from the pending values),
/// either immediately or after a delay respecting the interval, while remembering and dispatching all remembered values
/// when the interval has elapsed, ensuring no values are lost.
/// The second can be used for awaiting the next dispatch from the outside while adding some buffer time.
/// Two functions. The first has a Dispatch signature and is used to feed a single value into the factory,
/// where it is either dispatched immediately or after a delay respecting the interval,
/// batched with other pending values in the order they were fed in.
/// The second can be used for awaiting the next dispatch from the outside
/// - while optionally adding some buffer time (in milliseconds) to account for race condiditions.
/// </returns>
let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : ('value -> Cmd<'msg>) * (System.TimeSpan option -> Async<unit>) =
let rateLimit = System.TimeSpan.FromMilliseconds(float interval)
[<Extension>]
static member batchThrottled((dispatch: Dispatch<'msg>), interval, (mapBatchToMsg: 'value list -> 'msg)) =
let rateLimit = System.TimeSpan.FromMilliseconds(interval)
let funLock = obj() // ensures safe access to resources shared across different threads
let mutable lastDispatch = System.DateTime.MinValue
let mutable pendingValues: 'value list = []
Expand All @@ -343,49 +345,48 @@ module Cmd =
lastDispatch.Add(rateLimit) - System.DateTime.UtcNow

// dispatches all pendingValues and resets them while updating lastDispatch
let dispatchBatch (dispatch: 'msg -> unit) =
let dispatchBatch () =
// Dispatch in the order they were received
pendingValues |> List.rev |> mapValuesToMsg |> dispatch
pendingValues |> List.rev |> mapBatchToMsg |> dispatch

lastDispatch <- System.DateTime.UtcNow
pendingValues <- []

// a factory function mapping input values to sleeping Commands dispatching all pending messages
let factory =
// a function with the Dispatch signature for feeding a single value into the throttled batch factory
let dispatchSingle =
fun (value: 'value) ->
[ fun dispatch ->
lock funLock (fun () ->
let untilNextDispatch = getTimeUntilNextDispatch()
pendingValues <- value :: pendingValues

// If the interval has elapsed since the last dispatch, dispatch all pending messages
if untilNextDispatch <= System.TimeSpan.Zero then
dispatchBatch dispatch
else // schedule dispatch

// if the the last sleeping dispatch can still be cancelled, do so
if cts <> null then
cts.Cancel()
cts.Dispose()

// used to enable cancelling this dispatch if newer values come into the factory
cts <- new CancellationTokenSource()

Async.Start(
async {
// wait only as long as we have to before next dispatch
do! Async.Sleep(untilNextDispatch)

lock funLock (fun () ->
dispatchBatch dispatch

// done; invalidate own cancellation
if cts <> null then
cts.Dispose()
cts <- null)
},
cts.Token
)) ]
lock funLock (fun () ->
let untilNextDispatch = getTimeUntilNextDispatch()
pendingValues <- value :: pendingValues

// If the interval has elapsed since the last dispatch, dispatch all pending messages
if untilNextDispatch <= System.TimeSpan.Zero then
dispatchBatch()
else // schedule dispatch

// if the the last sleeping dispatch can still be cancelled, do so
if cts <> null then
cts.Cancel()
cts.Dispose()

// used to enable cancelling this dispatch if newer values come into the factory
cts <- new CancellationTokenSource()

Async.Start(
async {
// wait only as long as we have to before next dispatch
do! Async.Sleep(untilNextDispatch)

lock funLock (fun () ->
dispatchBatch()

// done; invalidate own cancellation
if cts <> null then
cts.Dispose()
cts <- null)
},
cts.Token
))

// a function to wait until after the next async dispatch + some buffer time to ensure the dispatch is complete
let awaitNextDispatch buffer =
Expand All @@ -395,12 +396,12 @@ module Cmd =
let untilAfterNextDispatch =
getTimeUntilNextDispatch()
+ match buffer with
| Some value -> value
| Some value -> System.TimeSpan.FromMilliseconds(value)
| None -> System.TimeSpan.Zero

if untilAfterNextDispatch > System.TimeSpan.Zero then
do! Async.Sleep(untilAfterNextDispatch)
})

// return both the factory and the await helper
factory, awaitNextDispatch
// return both the dispatch and the await helper
dispatchSingle, awaitNextDispatch

0 comments on commit 63f739a

Please sign in to comment.