diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index f0d8f2dc1a..da60539cbd 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -61,11 +61,14 @@ module Streamly.Internal.Data.Stream.Concurrent -- | Shares a single channel across many streams. , parConcat , parConcatMap + , parMergeMap -- *** ConcatIterate , parConcatIterate + , parMergeIterate -- ** Reactive + , parYieldWith , fromCallback , parTapCount , tapCount @@ -140,6 +143,8 @@ parEvalD modifier m = D.Stream step Nothing D.Stop -> D.Stop -} +-- XXX Rename to parBuffered + -- | 'parEval' evaluates a stream as a whole asynchronously with respect to -- the consumer of the stream. A worker thread evaluates multiple elements of -- the stream ahead of time and buffers the results; the consumer of the stream @@ -330,6 +335,15 @@ parConcatMap modifier f stream = $ parConcatMapK modifier (Stream.toStreamK . f) (Stream.toStreamK stream) +-- | Same as 'mergeMapWith interleave' but concurrent. +-- +-- /Unimplemented/ +-- +{-# INLINE parMergeMap #-} +parMergeMap :: -- MonadAsync m => + (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b +parMergeMap _modifier _f _stream = undefined + -- | Evaluate the streams in the input stream concurrently and combine them. -- -- >>> parConcat modifier = Stream.parConcatMap modifier id @@ -343,6 +357,8 @@ parConcat modifier = parConcatMap modifier id -- concat Lists ------------------------------------------------------------------------------- +-- XXX Rename to parListCat? + -- | Like 'parConcat' but works on a list of streams. -- -- >>> parList modifier = Stream.parConcat modifier . Stream.fromList @@ -408,6 +424,8 @@ parListEagerMin = parList (eager True . stopWhen AnyStops) -- Applicative ------------------------------------------------------------------------------- +-- XXX Rename to parStreamApply? + -- | Apply an argument stream to a function stream concurrently. Uses a -- shared channel for all individual applications within a stream application. {-# INLINE parApply #-} @@ -551,6 +569,17 @@ parConcatIterate modifier f input = -- XXX The channel q should be FIFO for DFS, otherwise it is BFS generate chan x = x `K.cons` iterateStream chan (Stream.toStreamK $ f x) +-- | Same as 'mergeIterateWith interleave' but concurrent. +-- +-- /Unimplemented/ +{-# INLINE parMergeIterate #-} +parMergeIterate :: -- MonadAsync m => + (Config -> Config) + -> (a -> Stream m a) + -> Stream m a + -> Stream m a +parMergeIterate _modifier _f _input = undefined + ------------------------------------------------------------------------------- -- Generate ------------------------------------------------------------------------------- @@ -622,22 +651,25 @@ newCallbackStream = do -- XXX Use fromChannelD? return (callback, fromChannel chan) --- XXX Rename this to parSetCallback. Also take the Channel config as argument. --- What config can be set by user here? --- --- XXX What happens if an exception occurs when evaluating the stream? The --- result of callback can be used to communicate that. But we can only know --- about the exception on the next callback call. For better handling the user --- can supply an exception sender function as argument to fromCallback. - --- | @fromCallback f@ creates an entangled pair of a callback and a stream i.e. --- whenever the callback is called a value appears in the stream. The function --- @f@ is invoked with the callback as argument, and the stream is returned. --- @f@ would store the callback for calling it later for generating values in --- the stream. --- --- The callback queues a value to a concurrent channel associated with the --- stream. The stream can be evaluated safely in any thread. +-- | @fromCallback action@ runs @action@ with a callback which is used by +-- the action to send values that appear in the resulting stream. The action +-- must be run in a separate thread independent of the one in which the stream +-- is being evaluated. The action is supposed to be run forever in an infinite +-- loop. +-- +-- Example: +-- +-- >> import Control.Concurrent (threadDelay, forkIO) +-- >> import Control.Monad (void, forever) +-- >> import qualified Streamly.Data.Fold as Fold +-- >> import qualified Streamly.Data.Stream.Prelude as Stream +-- >> +-- >> main = do +-- >> Stream.fold (Fold.drainMapM print) +-- >> $ Stream.fromCallback +-- >> $ \yield -> +-- >> void $ forkIO $ forever $ do +-- >> yield "x" >> threadDelay 1000000 -- -- /Pre-release/ -- @@ -648,6 +680,29 @@ fromCallback setCallback = Stream.concatEffect $ do setCallback callback return stream +-- XXX What happens if an exception occurs when evaluating the stream? The +-- result of callback can be used to communicate that. But we can only know +-- about the exception on the next callback call. For better handling the user +-- can supply an exception sender function as argument to fromCallback. Or +-- maybe we should just forward all exceptions the parent stream. +-- +-- XXX A serial version of yieldWith? +-- XXX For folds a parAwaitWith is possible. +-- XXX For pipes parYieldAwaitWith + +-- | An improved version of 'fromCallback'. +-- +-- * Takes a channel config modifier +-- * Evaluate the action in a parallel thread +-- * The action is supplied with a yield function to yield values to the stream +-- * Any exception generated is forwarded to the stream +-- * Sends a Stop event when the action is done. +-- +-- /Unimplemented/ +parYieldWith :: -- MonadAsync m => + (Config -> Config) -> ((a -> m b) -> m c) -> Stream m a +parYieldWith = undefined + -- | @parTapCount predicate fold stream@ taps the count of those elements in -- the stream that pass the @predicate@. The resulting count stream is sent to -- a @fold@ running concurrently in another thread.