diff --git a/rhine/rhine.cabal b/rhine/rhine.cabal index 9ed57eb7..12624811 100644 --- a/rhine/rhine.cabal +++ b/rhine/rhine.cabal @@ -138,6 +138,7 @@ library other-modules: FRP.Rhine.ClSF.Except.Util FRP.Rhine.ClSF.Random.Util + FRP.Rhine.Schedule.Internal -- LANGUAGE extensions used by modules in this package. -- other-extensions: @@ -146,6 +147,7 @@ library MonadRandom >=0.5, containers >=0.5, deepseq >=1.4, + foldable1-classes-compat ^>=0.1, free >=5.1, mmorph ^>=1.2, profunctors ^>=5.6, diff --git a/rhine/src/FRP/Rhine/Schedule.hs b/rhine/src/FRP/Rhine/Schedule.hs index e2784a6d..d2dbbbc4 100644 --- a/rhine/src/FRP/Rhine/Schedule.hs +++ b/rhine/src/FRP/Rhine/Schedule.hs @@ -20,22 +20,16 @@ module FRP.Rhine.Schedule where import Control.Arrow import Data.List.NonEmpty as N --- transformers -import Control.Monad.Trans.Reader - -- monad-schedule import Control.Monad.Schedule.Class -- automaton import Data.Automaton -import Data.Automaton.Recursive (getRecursive, toRecursive) -import Data.Stream import Data.Stream.Optimized (OptimizedStreamT (..), toStreamT) -import Data.Stream.Recursive qualified as StreamRecursive -import Data.Stream.Result -- rhine import FRP.Rhine.Clock +import FRP.Rhine.Schedule.Internal -- * Scheduling @@ -48,35 +42,15 @@ scheduleList :: (Monad m, MonadSchedule m) => NonEmpty (Automaton m a b) -> Auto scheduleList automatons0 = Automaton $ Stateful $ - StreamT - { state = (getRecursive . toRecursive <$> automatons0, []) - , step = \(automatons, running) -> ReaderT $ \a -> do - let bsAndConts = flip (runReaderT . StreamRecursive.getRecursive) a <$> automatons - (done, running') <- schedule (N.head bsAndConts :| N.tail bsAndConts ++ running) - return $ Result (resultState <$> done, running') $ output <$> done - } + scheduleStreams' $ + toStreamT . getAutomaton <$> automatons0 {- | Run two automata concurrently. Whenever one automaton returns a value, it is returned. - -This is similar to 'scheduleList', but more efficient. -} schedulePair :: (Monad m, MonadSchedule m) => Automaton m a b -> Automaton m a b -> Automaton m a b -schedulePair (Automaton automatonL) (Automaton automatonR) = Automaton $! Stateful $! scheduleStreams (toStreamT automatonL) (toStreamT automatonR) - where - scheduleStreams :: (Monad m, MonadSchedule m) => StreamT m b -> StreamT m b -> StreamT m b - scheduleStreams (StreamT stateL0 stepL) (StreamT stateR0 stepR) = - StreamT - { state = (stepL stateL0, stepR stateR0) - , step - } - where - step (runningL, runningR) = do - result <- race runningL runningR - case result of - Left (Result stateL' b, runningR') -> return $ Result (stepL stateL', runningR') b - Right (runningL', Result stateR' b) -> return $ Result (runningL', stepR stateR') b +schedulePair automatonL automatonR = concatS $ fmap toList $ scheduleList $ automatonL :| [automatonR] -- | Run two running clocks concurrently. runningSchedule :: diff --git a/rhine/src/FRP/Rhine/Schedule/Internal.hs b/rhine/src/FRP/Rhine/Schedule/Internal.hs new file mode 100644 index 00000000..263fb446 --- /dev/null +++ b/rhine/src/FRP/Rhine/Schedule/Internal.hs @@ -0,0 +1,85 @@ +{-# LANGUAGE ExistentialQuantification #-} + +module FRP.Rhine.Schedule.Internal where + +-- base +import Control.Arrow +import Data.Function ((&)) +import Data.Functor ((<&>)) +import Data.Functor.Compose (Compose (..)) +import Data.Kind (Type) +import Data.List.NonEmpty as N + +-- foldable1-classes-compat +import Data.Foldable1 (Foldable1 (foldrMap1)) + +-- sop-core +import Data.SOP (HCollapse (hcollapse), HSequence (htraverse'), I (..), K (K), NP (..), NS (..), SListI, apInjs_NP, hliftA, hzipWith, unI) + +-- monad-schedule +import Control.Monad.Schedule.Class + +-- automaton +import Data.Stream hiding (concatS) +import Data.Stream.Result + +-- | One step of a stream, with the state type argument going last, so it is usable with sop-core. +newtype Step m b state = Step {getStep :: ResultStateT state m b} + +-- | The result of a stream, with the type arguments swapped, so it's usable with sop-core +newtype RunningResult b state = RunningResult {getRunningResult :: Result state b} + +-- | Transform an n-ary product of at least one type into a nonempty list of all its content. +apInjs_NPNonEmpty :: (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs)) +apInjs_NPNonEmpty (fx :* fxs) = Z fx :| (S <$> apInjs_NP fxs) + +-- | A nonempty list of 'StreamT's, unzipped into their states and their steps. +data Streams m b = forall state (states :: [Type]). + (SListI states) => + Streams + { states :: NP I (state ': states) + , steps :: NP (Step m b) (state ': states) + } + +-- | Run 'Streams' concurrently by scheduling them in 'MonadSchedule'. +scheduleStreams :: (MonadSchedule m, Functor m, Applicative m) => Streams m b -> StreamT m (NonEmpty b) +scheduleStreams Streams {states, steps} = + StreamT + { state = (apInjs_NPNonEmpty states, []) -- All the initial states and no currently running continuations + , step = + -- Some streams have not started yet, or just finished their step. Others are still running. + \(restingStates, runningStreams) -> + -- Start all currently not running streams by zipping each with its step + fmap (htraverse' getCompose . hzipWith (\Step {getStep} -> Compose . fmap RunningResult . getResultStateT getStep . unI) steps) restingStates + -- Append all already running states to the freshly started ones + & flip appendList runningStreams + -- Schedule all running streams concurrently + & schedule + -- Separate into finished streams and still running streams + & fmap + ( \(finished, running) -> + let finishedStates = finished <&> (hliftA (getRunningResult >>> resultState >>> I)) + outputs = + finished + <&> (hliftA (getRunningResult >>> output >>> K) >>> hcollapse) + in Result (finishedStates, running) outputs + ) + } + +-- | Run a nonempty list of streams concurrently. +scheduleStreams' :: (MonadSchedule m, Applicative m) => NonEmpty (StreamT m b) -> StreamT m (NonEmpty b) +scheduleStreams' = scheduleStreams . foldrMap1 buildStreams consStreams + where + buildStreams :: StreamT m b -> Streams m b + buildStreams StreamT {state, step} = + Streams + { states = I state :* Nil + , steps = Step (ResultStateT step) :* Nil + } + + consStreams :: StreamT m b -> Streams m b -> Streams m b + consStreams StreamT {state, step} Streams {states, steps} = + Streams + { states = I state :* states + , steps = Step (ResultStateT step) :* steps + }