Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule a list of automata efficiently #343

Merged
merged 2 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions automaton/src/Data/Automaton.hs
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,13 @@ handleAutomaton_ f = Automaton . StreamOptimized.withOptimized f . getAutomaton
handleAutomaton :: (Monad m) => (StreamT (ReaderT a m) b -> StreamT (ReaderT c n) d) -> Automaton m a b -> Automaton n c d
handleAutomaton f = Automaton . StreamOptimized.handleOptimized f . getAutomaton

-- | Buffer the output of an automaton. See 'Data.Stream.concatS'.
concatS :: (Monad m) => Automaton m () [b] -> Automaton m () b
{- | Buffer the output of an automaton. See 'Data.Stream.concatS'.

The input for the automaton is not buffered.
For example, if @'concatS' automaton@ receives one input @a@ and @automaton@ produces 10 @b@s from it,
then the next 9 inputs will be ignored.
-}
concatS :: (Monad m) => Automaton m a [b] -> Automaton m a b
concatS (Automaton automaton) = Automaton $ Data.Stream.Optimized.concatS automaton

-- * Examples
Expand Down
2 changes: 2 additions & 0 deletions rhine/rhine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ library
other-modules:
FRP.Rhine.ClSF.Except.Util
FRP.Rhine.ClSF.Random.Util
FRP.Rhine.Schedule.Internal

-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
Expand All @@ -146,6 +147,7 @@ library
MonadRandom >=0.5,
containers >=0.5,
deepseq >=1.4,
foldable1-classes-compat ^>=0.1,
free >=5.1,
mmorph ^>=1.2,
profunctors ^>=5.6,
Expand Down
34 changes: 4 additions & 30 deletions rhine/src/FRP/Rhine/Schedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,16 @@ module FRP.Rhine.Schedule where
import Control.Arrow
import Data.List.NonEmpty as N

-- transformers
import Control.Monad.Trans.Reader

-- monad-schedule
import Control.Monad.Schedule.Class

-- automaton
import Data.Automaton
import Data.Automaton.Recursive (getRecursive, toRecursive)
import Data.Stream
import Data.Stream.Optimized (OptimizedStreamT (..), toStreamT)
import Data.Stream.Recursive qualified as StreamRecursive
import Data.Stream.Result

-- rhine
import FRP.Rhine.Clock
import FRP.Rhine.Schedule.Internal

-- * Scheduling

Expand All @@ -48,35 +42,15 @@ scheduleList :: (Monad m, MonadSchedule m) => NonEmpty (Automaton m a b) -> Auto
scheduleList automatons0 =
Automaton $
Stateful $
StreamT
{ state = (getRecursive . toRecursive <$> automatons0, [])
, step = \(automatons, running) -> ReaderT $ \a -> do
let bsAndConts = flip (runReaderT . StreamRecursive.getRecursive) a <$> automatons
(done, running') <- schedule (N.head bsAndConts :| N.tail bsAndConts ++ running)
return $ Result (resultState <$> done, running') $ output <$> done
}
scheduleStreams' $
toStreamT . getAutomaton <$> automatons0

{- | Run two automata concurrently.

Whenever one automaton returns a value, it is returned.

This is similar to 'scheduleList', but more efficient.
-}
schedulePair :: (Monad m, MonadSchedule m) => Automaton m a b -> Automaton m a b -> Automaton m a b
schedulePair (Automaton automatonL) (Automaton automatonR) = Automaton $! Stateful $! scheduleStreams (toStreamT automatonL) (toStreamT automatonR)
where
scheduleStreams :: (Monad m, MonadSchedule m) => StreamT m b -> StreamT m b -> StreamT m b
scheduleStreams (StreamT stateL0 stepL) (StreamT stateR0 stepR) =
StreamT
{ state = (stepL stateL0, stepR stateR0)
, step
}
where
step (runningL, runningR) = do
result <- race runningL runningR
case result of
Left (Result stateL' b, runningR') -> return $ Result (stepL stateL', runningR') b
Right (runningL', Result stateR' b) -> return $ Result (runningL', stepR stateR') b
schedulePair automatonL automatonR = concatS $ fmap toList $ scheduleList $ automatonL :| [automatonR]

-- | Run two running clocks concurrently.
runningSchedule ::
Expand Down
85 changes: 85 additions & 0 deletions rhine/src/FRP/Rhine/Schedule/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{-# LANGUAGE ExistentialQuantification #-}

module FRP.Rhine.Schedule.Internal where

-- base
import Control.Arrow
import Data.Function ((&))
import Data.Functor ((<&>))
import Data.Functor.Compose (Compose (..))
import Data.Kind (Type)
import Data.List.NonEmpty as N

-- foldable1-classes-compat
import Data.Foldable1 (Foldable1 (foldrMap1))

-- sop-core
import Data.SOP (HCollapse (hcollapse), HSequence (htraverse'), I (..), K (K), NP (..), NS (..), SListI, apInjs_NP, hliftA, hzipWith, unI)

-- monad-schedule
import Control.Monad.Schedule.Class

-- automaton
import Data.Stream hiding (concatS)
import Data.Stream.Result

-- | One step of a stream, with the state type argument going last, so it is usable with sop-core.
newtype Step m b state = Step {getStep :: ResultStateT state m b}

-- | The result of a stream, with the type arguments swapped, so it's usable with sop-core
newtype RunningResult b state = RunningResult {getRunningResult :: Result state b}

-- | Transform an n-ary product of at least one type into a nonempty list of all its content.
apInjs_NPNonEmpty :: (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))

Check warning on line 33 in rhine/src/FRP/Rhine/Schedule/Internal.hs

View workflow job for this annotation

GitHub Actions / Run hlint

Suggestion in apInjs_NPNonEmpty in module FRP.Rhine.Schedule.Internal: Use camelCase ▫︎ Found: "apInjs_NPNonEmpty ::\n (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))" ▫︎ Perhaps: "apInjsNPNonEmpty ::\n (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))"
apInjs_NPNonEmpty (fx :* fxs) = Z fx :| (S <$> apInjs_NP fxs)

Check warning on line 34 in rhine/src/FRP/Rhine/Schedule/Internal.hs

View workflow job for this annotation

GitHub Actions / Run hlint

Suggestion in apInjs_NPNonEmpty in module FRP.Rhine.Schedule.Internal: Use camelCase ▫︎ Found: "apInjs_NPNonEmpty (fx :* fxs) = ..." ▫︎ Perhaps: "apInjsNPNonEmpty (fx :* fxs) = ..."

-- | A nonempty list of 'StreamT's, unzipped into their states and their steps.
data Streams m b = forall state (states :: [Type]).
(SListI states) =>
Streams
{ states :: NP I (state ': states)
, steps :: NP (Step m b) (state ': states)
}

-- | Run 'Streams' concurrently by scheduling them in 'MonadSchedule'.
scheduleStreams :: (MonadSchedule m, Functor m, Applicative m) => Streams m b -> StreamT m (NonEmpty b)
scheduleStreams Streams {states, steps} =
StreamT
{ state = (apInjs_NPNonEmpty states, []) -- All the initial states and no currently running continuations
, step =
-- Some streams have not started yet, or just finished their step. Others are still running.
\(restingStates, runningStreams) ->
-- Start all currently not running streams by zipping each with its step
fmap (htraverse' getCompose . hzipWith (\Step {getStep} -> Compose . fmap RunningResult . getResultStateT getStep . unI) steps) restingStates
-- Append all already running states to the freshly started ones
& flip appendList runningStreams
-- Schedule all running streams concurrently
& schedule
-- Separate into finished streams and still running streams
& fmap
( \(finished, running) ->
let finishedStates = finished <&> (hliftA (getRunningResult >>> resultState >>> I))

Check warning on line 61 in rhine/src/FRP/Rhine/Schedule/Internal.hs

View workflow job for this annotation

GitHub Actions / Run hlint

Suggestion in scheduleStreams in module FRP.Rhine.Schedule.Internal: Redundant bracket ▫︎ Found: "finished <&> (hliftA (getRunningResult >>> resultState >>> I))" ▫︎ Perhaps: "finished <&> hliftA (getRunningResult >>> resultState >>> I)"
outputs =
finished
<&> (hliftA (getRunningResult >>> output >>> K) >>> hcollapse)
in Result (finishedStates, running) outputs
)
}

-- | Run a nonempty list of streams concurrently.
scheduleStreams' :: (MonadSchedule m, Applicative m) => NonEmpty (StreamT m b) -> StreamT m (NonEmpty b)
scheduleStreams' = scheduleStreams . foldrMap1 buildStreams consStreams
where
buildStreams :: StreamT m b -> Streams m b
buildStreams StreamT {state, step} =
Streams
{ states = I state :* Nil
, steps = Step (ResultStateT step) :* Nil
}

consStreams :: StreamT m b -> Streams m b -> Streams m b
consStreams StreamT {state, step} Streams {states, steps} =
Streams
{ states = I state :* states
, steps = Step (ResultStateT step) :* steps
}