Skip to content

Commit

Permalink
Schedule an arbitrary nonempty list of automata
Browse files Browse the repository at this point in the history
  • Loading branch information
turion committed Oct 27, 2024
1 parent bb8c5bf commit 927e67c
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 30 deletions.
2 changes: 2 additions & 0 deletions rhine/rhine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ library
other-modules:
FRP.Rhine.ClSF.Except.Util
FRP.Rhine.ClSF.Random.Util
FRP.Rhine.Schedule.Internal

-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
Expand All @@ -146,6 +147,7 @@ library
MonadRandom >=0.5,
containers >=0.5,
deepseq >=1.4,
foldable1-classes-compat ^>=0.1,
free >=5.1,
mmorph ^>=1.2,
profunctors ^>=5.6,
Expand Down
34 changes: 4 additions & 30 deletions rhine/src/FRP/Rhine/Schedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,16 @@ module FRP.Rhine.Schedule where
import Control.Arrow
import Data.List.NonEmpty as N

-- transformers
import Control.Monad.Trans.Reader

-- monad-schedule
import Control.Monad.Schedule.Class

-- automaton
import Data.Automaton
import Data.Automaton.Recursive (getRecursive, toRecursive)
import Data.Stream
import Data.Stream.Optimized (OptimizedStreamT (..), toStreamT)
import Data.Stream.Recursive qualified as StreamRecursive
import Data.Stream.Result

-- rhine
import FRP.Rhine.Clock
import FRP.Rhine.Schedule.Internal

-- * Scheduling

Expand All @@ -48,35 +42,15 @@ scheduleList :: (Monad m, MonadSchedule m) => NonEmpty (Automaton m a b) -> Auto
scheduleList automatons0 =
Automaton $
Stateful $
StreamT
{ state = (getRecursive . toRecursive <$> automatons0, [])
, step = \(automatons, running) -> ReaderT $ \a -> do
let bsAndConts = flip (runReaderT . StreamRecursive.getRecursive) a <$> automatons
(done, running') <- schedule (N.head bsAndConts :| N.tail bsAndConts ++ running)
return $ Result (resultState <$> done, running') $ output <$> done
}
scheduleStreams' $
toStreamT . getAutomaton <$> automatons0

{- | Run two automata concurrently.
Whenever one automaton returns a value, it is returned.
This is similar to 'scheduleList', but more efficient.
-}
schedulePair :: (Monad m, MonadSchedule m) => Automaton m a b -> Automaton m a b -> Automaton m a b
schedulePair (Automaton automatonL) (Automaton automatonR) = Automaton $! Stateful $! scheduleStreams (toStreamT automatonL) (toStreamT automatonR)
where
scheduleStreams :: (Monad m, MonadSchedule m) => StreamT m b -> StreamT m b -> StreamT m b
scheduleStreams (StreamT stateL0 stepL) (StreamT stateR0 stepR) =
StreamT
{ state = (stepL stateL0, stepR stateR0)
, step
}
where
step (runningL, runningR) = do
result <- race runningL runningR
case result of
Left (Result stateL' b, runningR') -> return $ Result (stepL stateL', runningR') b
Right (runningL', Result stateR' b) -> return $ Result (runningL', stepR stateR') b
schedulePair automatonL automatonR = concatS $ fmap toList $ scheduleList $ automatonL :| [automatonR]

-- | Run two running clocks concurrently.
runningSchedule ::
Expand Down
85 changes: 85 additions & 0 deletions rhine/src/FRP/Rhine/Schedule/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{-# LANGUAGE ExistentialQuantification #-}

module FRP.Rhine.Schedule.Internal where

-- base
import Control.Arrow
import Data.Function ((&))
import Data.Functor ((<&>))
import Data.Functor.Compose (Compose (..))
import Data.Kind (Type)
import Data.List.NonEmpty as N

-- foldable1-classes-compat
import Data.Foldable1 (Foldable1 (foldrMap1))

-- sop-core
import Data.SOP (HCollapse (hcollapse), HSequence (htraverse'), I (..), K (K), NP (..), NS (..), SListI, apInjs_NP, hliftA, hzipWith, unI)

-- monad-schedule
import Control.Monad.Schedule.Class

-- automaton
import Data.Stream hiding (concatS)
import Data.Stream.Result

-- | One step of a stream, with the state type argument going last, so it is usable with sop-core.
newtype Step m b state = Step {getStep :: ResultStateT state m b}

-- | The result of a stream, with the type arguments swapped, so it's usable with sop-core
newtype RunningResult b state = RunningResult {getRunningResult :: Result state b}

-- | Transform an n-ary product of at least one type into a nonempty list of all its content.
apInjs_NPNonEmpty :: (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))

Check warning on line 33 in rhine/src/FRP/Rhine/Schedule/Internal.hs

View workflow job for this annotation

GitHub Actions / Run hlint

Suggestion in apInjs_NPNonEmpty in module FRP.Rhine.Schedule.Internal: Use camelCase ▫︎ Found: "apInjs_NPNonEmpty ::\n (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))" ▫︎ Perhaps: "apInjsNPNonEmpty ::\n (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))"
apInjs_NPNonEmpty (fx :* fxs) = Z fx :| (S <$> apInjs_NP fxs)

Check warning on line 34 in rhine/src/FRP/Rhine/Schedule/Internal.hs

View workflow job for this annotation

GitHub Actions / Run hlint

Suggestion in apInjs_NPNonEmpty in module FRP.Rhine.Schedule.Internal: Use camelCase ▫︎ Found: "apInjs_NPNonEmpty (fx :* fxs) = ..." ▫︎ Perhaps: "apInjsNPNonEmpty (fx :* fxs) = ..."

-- | A nonempty list of 'StreamT's, unzipped into their states and their steps.
data Streams m b = forall state (states :: [Type]).
(SListI states) =>
Streams
{ states :: NP I (state ': states)
, steps :: NP (Step m b) (state ': states)
}

-- | Run 'Streams' concurrently by scheduling them in 'MonadSchedule'.
scheduleStreams :: (MonadSchedule m, Functor m, Applicative m) => Streams m b -> StreamT m (NonEmpty b)
scheduleStreams Streams {states, steps} =
StreamT
{ state = (apInjs_NPNonEmpty states, []) -- All the initial states and no currently running continuations
, step =
-- Some streams have not started yet, or just finished their step. Others are still running.
\(restingStates, runningStreams) ->
-- Start all currently not running streams by zipping each with its step
fmap (htraverse' getCompose . hzipWith (\Step {getStep} -> Compose . fmap RunningResult . getResultStateT getStep . unI) steps) restingStates
-- Append all already running states to the freshly started ones
& flip appendList runningStreams
-- Schedule all running streams concurrently
& schedule
-- Separate into finished streams and still running streams
& fmap
( \(finished, running) ->
let finishedStates = finished <&> (hliftA (getRunningResult >>> resultState >>> I))

Check warning on line 61 in rhine/src/FRP/Rhine/Schedule/Internal.hs

View workflow job for this annotation

GitHub Actions / Run hlint

Suggestion in scheduleStreams in module FRP.Rhine.Schedule.Internal: Redundant bracket ▫︎ Found: "finished <&> (hliftA (getRunningResult >>> resultState >>> I))" ▫︎ Perhaps: "finished <&> hliftA (getRunningResult >>> resultState >>> I)"
outputs =
finished
<&> (hliftA (getRunningResult >>> output >>> K) >>> hcollapse)
in Result (finishedStates, running) outputs
)
}

-- | Run a nonempty list of streams concurrently.
scheduleStreams' :: (MonadSchedule m, Applicative m) => NonEmpty (StreamT m b) -> StreamT m (NonEmpty b)
scheduleStreams' = scheduleStreams . foldrMap1 buildStreams consStreams
where
buildStreams :: StreamT m b -> Streams m b
buildStreams StreamT {state, step} =
Streams
{ states = I state :* Nil
, steps = Step (ResultStateT step) :* Nil
}

consStreams :: StreamT m b -> Streams m b -> Streams m b
consStreams StreamT {state, step} Streams {states, steps} =
Streams
{ states = I state :* states
, steps = Step (ResultStateT step) :* steps
}

0 comments on commit 927e67c

Please sign in to comment.