Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add docs and some unimplemented prototypes #2937

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 71 additions & 16 deletions src/Streamly/Internal/Data/Stream/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ module Streamly.Internal.Data.Stream.Concurrent
-- | Shares a single channel across many streams.
, parConcat
, parConcatMap
, parMergeMap

-- *** ConcatIterate
, parConcatIterate
, parMergeIterate

-- ** Reactive
, parYieldWith
, fromCallback
, parTapCount
, tapCount
Expand Down Expand Up @@ -140,6 +143,8 @@ parEvalD modifier m = D.Stream step Nothing
D.Stop -> D.Stop
-}

-- XXX Rename to parBuffered

-- | 'parEval' evaluates a stream as a whole asynchronously with respect to
-- the consumer of the stream. A worker thread evaluates multiple elements of
-- the stream ahead of time and buffers the results; the consumer of the stream
Expand Down Expand Up @@ -330,6 +335,15 @@ parConcatMap modifier f stream =
$ parConcatMapK
modifier (Stream.toStreamK . f) (Stream.toStreamK stream)

-- | Same as 'mergeMapWith interleave' but concurrent.
--
-- /Unimplemented/
--
{-# INLINE parMergeMap #-}
parMergeMap :: -- MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parMergeMap _modifier _f _stream = undefined

-- | Evaluate the streams in the input stream concurrently and combine them.
--
-- >>> parConcat modifier = Stream.parConcatMap modifier id
Expand All @@ -343,6 +357,8 @@ parConcat modifier = parConcatMap modifier id
-- concat Lists
-------------------------------------------------------------------------------

-- XXX Rename to parListCat?

-- | Like 'parConcat' but works on a list of streams.
--
-- >>> parList modifier = Stream.parConcat modifier . Stream.fromList
Expand Down Expand Up @@ -408,6 +424,8 @@ parListEagerMin = parList (eager True . stopWhen AnyStops)
-- Applicative
-------------------------------------------------------------------------------

-- XXX Rename to parStreamApply?

-- | Apply an argument stream to a function stream concurrently. Uses a
-- shared channel for all individual applications within a stream application.
{-# INLINE parApply #-}
Expand Down Expand Up @@ -551,6 +569,17 @@ parConcatIterate modifier f input =
-- XXX The channel q should be FIFO for DFS, otherwise it is BFS
generate chan x = x `K.cons` iterateStream chan (Stream.toStreamK $ f x)

-- | Same as 'mergeIterateWith interleave' but concurrent.
--
-- /Unimplemented/
{-# INLINE parMergeIterate #-}
parMergeIterate :: -- MonadAsync m =>
(Config -> Config)
-> (a -> Stream m a)
-> Stream m a
-> Stream m a
parMergeIterate _modifier _f _input = undefined

-------------------------------------------------------------------------------
-- Generate
-------------------------------------------------------------------------------
Expand Down Expand Up @@ -622,22 +651,25 @@ newCallbackStream = do
-- XXX Use fromChannelD?
return (callback, fromChannel chan)

-- XXX Rename this to parSetCallback. Also take the Channel config as argument.
-- What config can be set by user here?
--
-- XXX What happens if an exception occurs when evaluating the stream? The
-- result of callback can be used to communicate that. But we can only know
-- about the exception on the next callback call. For better handling the user
-- can supply an exception sender function as argument to fromCallback.

-- | @fromCallback f@ creates an entangled pair of a callback and a stream i.e.
-- whenever the callback is called a value appears in the stream. The function
-- @f@ is invoked with the callback as argument, and the stream is returned.
-- @f@ would store the callback for calling it later for generating values in
-- the stream.
--
-- The callback queues a value to a concurrent channel associated with the
-- stream. The stream can be evaluated safely in any thread.
-- | @fromCallback action@ runs @action@ with a callback which is used by
-- the action to send values that appear in the resulting stream. The action
-- must be run in a separate thread independent of the one in which the stream
-- is being evaluated. The action is supposed to be run forever in an infinite
-- loop.
--
-- Example:
--
-- >> import Control.Concurrent (threadDelay, forkIO)
-- >> import Control.Monad (void, forever)
-- >> import qualified Streamly.Data.Fold as Fold
-- >> import qualified Streamly.Data.Stream.Prelude as Stream
-- >>
-- >> main = do
-- >> Stream.fold (Fold.drainMapM print)
-- >> $ Stream.fromCallback
-- >> $ \yield ->
-- >> void $ forkIO $ forever $ do
-- >> yield "x" >> threadDelay 1000000
--
-- /Pre-release/
--
Expand All @@ -648,6 +680,29 @@ fromCallback setCallback = Stream.concatEffect $ do
setCallback callback
return stream

-- XXX What happens if an exception occurs when evaluating the stream? The
-- result of callback can be used to communicate that. But we can only know
-- about the exception on the next callback call. For better handling the user
-- can supply an exception sender function as argument to fromCallback. Or
-- maybe we should just forward all exceptions the parent stream.
--
-- XXX A serial version of yieldWith?
-- XXX For folds a parAwaitWith is possible.
-- XXX For pipes parYieldAwaitWith

-- | An improved version of 'fromCallback'.
--
-- * Takes a channel config modifier
-- * Evaluate the action in a parallel thread
-- * The action is supplied with a yield function to yield values to the stream
-- * Any exception generated is forwarded to the stream
-- * Sends a Stop event when the action is done.
--
-- /Unimplemented/
parYieldWith :: -- MonadAsync m =>
(Config -> Config) -> ((a -> m b) -> m c) -> Stream m a
parYieldWith = undefined

-- | @parTapCount predicate fold stream@ taps the count of those elements in
-- the stream that pass the @predicate@. The resulting count stream is sent to
-- a @fold@ running concurrently in another thread.
Expand Down
Loading