From ad05792d0b361c4629fc3009f69e21988eeaae88 Mon Sep 17 00:00:00 2001 From: pranaysashank Date: Tue, 8 Sep 2020 12:55:22 +0530 Subject: [PATCH] Deprecate Streamly module and move the exports to Streamly.Prelude. - Deprecate and replace functions: foldWith => concatFoldableWith foldMapWith => concatMapFoldableWith forEachWith => concatForFoldableWith --- Changelog.md | 9 + README.md | 34 +- benchmark/NanoBenchmarks.hs | 2 +- benchmark/Streamly/Benchmark/Data/ArrayOps.hs | 1 - benchmark/Streamly/Benchmark/Data/Fold.hs | 4 +- benchmark/Streamly/Benchmark/Data/Parser.hs | 5 +- .../Streamly/Benchmark/Data/Parser/ParserD.hs | 5 +- .../Streamly/Benchmark/Data/Parser/ParserK.hs | 5 +- .../Streamly/Benchmark/Data/Stream/StreamK.hs | 10 +- .../Streamly/Benchmark/Prelude/Adaptive.hs | 1 - benchmark/Streamly/Benchmark/Prelude/Ahead.hs | 2 +- benchmark/Streamly/Benchmark/Prelude/Async.hs | 2 +- .../Streamly/Benchmark/Prelude/Concurrent.hs | 2 +- .../Streamly/Benchmark/Prelude/Parallel.hs | 4 +- benchmark/Streamly/Benchmark/Prelude/Rate.hs | 5 +- .../Streamly/Benchmark/Prelude/Serial.hs | 6 +- .../Streamly/Benchmark/Prelude/WAsync.hs | 2 +- .../Streamly/Benchmark/Prelude/WSerial.hs | 2 +- .../Streamly/Benchmark/Prelude/ZipAsync.hs | 3 +- .../Streamly/Benchmark/Prelude/ZipSerial.hs | 10 +- benchmark/lib/Streamly/Benchmark/Common.hs | 2 +- benchmark/lib/Streamly/Benchmark/Prelude.hs | 16 +- examples/AcidRain.hs | 11 +- examples/CirclingSquare.hs | 5 +- examples/ControlFlow.hs | 9 +- examples/EchoServer.hs | 5 +- examples/FileSinkServer.hs | 3 +- examples/FromFileClient.hs | 3 +- examples/ListDir.hs | 3 +- examples/MergeSort.hs | 2 +- examples/Rate.hs | 5 +- examples/SearchQuery.hs | 14 +- examples/WordCount.hs | 3 +- src/Streamly.hs | 42 ++- src/Streamly/Internal/Data/Pipe.hs | 2 +- src/Streamly/Internal/Data/SVar.hs | 11 +- src/Streamly/Internal/Data/Stream/Ahead.hs | 22 +- src/Streamly/Internal/Data/Stream/Async.hs | 46 ++- .../Internal/Data/Stream/Combinators.hs | 28 +- src/Streamly/Internal/Data/Stream/IsStream.hs | 117 +++++-- src/Streamly/Internal/Data/Stream/Parallel.hs | 21 +- src/Streamly/Internal/Data/Stream/Prelude.hs | 49 +-- src/Streamly/Internal/Data/Stream/Serial.hs | 28 +- .../Internal/Data/Stream/StreamK/Type.hs | 12 +- src/Streamly/Internal/Data/Stream/Zip.hs | 24 +- .../Internal/FileSystem/Event/Darwin.hs | 2 +- .../Internal/FileSystem/Event/Linux.hs | 2 +- src/Streamly/Internal/FileSystem/Handle.hs | 2 +- src/Streamly/Internal/Network/Inet/TCP.hs | 2 +- src/Streamly/Internal/Network/Socket.hs | 2 +- src/Streamly/Internal/Unicode/Array/Char.hs | 3 +- .../Internal/Unicode/Array/Prim/Pinned.hs | 3 +- src/Streamly/Internal/Unicode/Char.hs | 2 +- src/Streamly/Internal/Unicode/Stream.hs | 2 +- src/Streamly/Prelude.hs | 290 +++++++++++++++++- src/Streamly/Tutorial.hs | 165 +++++----- test/Main.hs | 72 +++-- test/PureStreams.hs | 6 +- test/Streamly/Test/Array.hs | 3 +- test/Streamly/Test/FileSystem/Event.hs | 2 +- test/Streamly/Test/Prelude.hs | 16 +- test/Streamly/Test/Prelude/Ahead.hs | 2 +- test/Streamly/Test/Prelude/Async.hs | 2 +- test/Streamly/Test/Prelude/Concurrent.hs | 2 +- test/Streamly/Test/Prelude/Parallel.hs | 2 +- test/Streamly/Test/Prelude/Serial.hs | 3 +- test/Streamly/Test/Prelude/WAsync.hs | 2 +- test/Streamly/Test/Prelude/WSerial.hs | 2 +- test/Streamly/Test/Prelude/ZipAsync.hs | 2 +- test/Streamly/Test/Prelude/ZipSerial.hs | 2 +- test/loops.hs | 6 +- test/nested-loops.hs | 6 +- test/parallel-loops.hs | 2 +- 73 files changed, 819 insertions(+), 385 deletions(-) diff --git a/Changelog.md b/Changelog.md index 94ae6a866f..885f72b41b 100644 --- a/Changelog.md +++ b/Changelog.md @@ -28,6 +28,15 @@ fully drained. Also, the resource acquisition and release is atomic with respect to async exceptions. +### Deprecations + +* The `Streamly` module is now deprecated, its functionality is subsumed + by `Streamly.Prelude`. +* Some functions from `Streamly` module have been renamed in `Streamly.Prelude` module: + * `foldWith` has been replaced by `concatFoldableWith` + * `foldMapWith` has been replaced by `concatMapFoldableWith` + * `forEachWith` has been replaced by `concatForFoldableWith` + ## 0.7.2 ### Bug Fixes diff --git a/README.md b/README.md index c7a6331c06..22179ea2e4 100644 --- a/README.md +++ b/README.md @@ -106,9 +106,9 @@ Please see [INSTALL.md](./INSTALL.md) for instructions on how to use streamly with your Haskell build tool or package manager. You may want to go through it before jumping to run the examples below. -The module `Streamly` provides just the core stream types, type casting and -concurrency control combinators. Stream construction, transformation, folding, -merging, zipping combinators are found in `Streamly.Prelude`. +The module `Streamly.Prelude` provides the core stream types and combinators +for type casting, controlling concurrency, stream construction, transformation, +folding, merging, and zipping. ## Streaming Pipelines @@ -117,7 +117,6 @@ numbers from stdin, prints the squares of even numbers and exits if an even number more than 9 is entered. ``` haskell -import Streamly import qualified Streamly.Prelude as S import Data.Function ((&)) @@ -150,7 +149,7 @@ each action is finished (asyncly): ``` haskell > let p n = threadDelay (n * 1000000) >> return n -> S.toList $ asyncly $ p 3 |: p 2 |: p 1 |: S.nil +> S.toList $ S.asyncly $ p 3 |: p 2 |: p 1 |: S.nil [1,2,3] ``` @@ -158,7 +157,7 @@ Use `aheadly` if you want speculative concurrency i.e. execute the actions in the stream concurrently but consume the results in the specified order: ``` haskell -> S.toList $ aheadly $ p 3 |: p 2 |: p 1 |: S.nil +> S.toList $ S.aheadly $ p 3 |: p 2 |: p 1 |: S.nil [3,2,1] ``` @@ -168,7 +167,7 @@ Monadic stream generation functions e.g. `unfoldrM`, `replicateM`, `repeatM`, The following finishes in 10 seconds (100 seconds when serial): ``` haskell -S.drain $ asyncly $ S.replicateM 10 $ p 10 +S.drain $ S.asyncly $ S.replicateM 10 $ p 10 ``` ## Concurrency Auto Scaling @@ -177,7 +176,7 @@ Concurrency is auto-scaled i.e. more actions are executed concurrently if the consumer is consuming the stream at a higher speed. How many tasks are executed concurrently can be controlled by `maxThreads` and how many results are buffered ahead of consumption can be controlled by `maxBuffer`. See the -documentation in the `Streamly` module. +documentation in the `Streamly.Prelude` module. ## Concurrent Streaming Pipelines @@ -198,7 +197,7 @@ We can use `mapM` or `sequence` functions concurrently on a stream. ``` haskell > let p n = threadDelay (n * 1000000) >> return n -> S.drain $ aheadly $ S.mapM (\x -> p 1 >> print x) (serially $ repeatM (p 1)) +> S.drain $ S.aheadly $ S.mapM (\x -> p 1 >> print x) (S.serially $ S.repeatM (p 1)) ``` ## Serial and Concurrent Merging @@ -209,11 +208,10 @@ stream, each with a delay of 1 to 10 seconds, respectively. Since all the actions are concurrent we see one output printed every second: ``` haskell -import Streamly import qualified Streamly.Prelude as S import Control.Concurrent (threadDelay) -main = S.toList $ parallely $ foldMap delay [1..10] +main = S.toList $ S.parallely $ foldMap delay [1..10] where delay n = S.yieldM $ threadDelay (n * 1000000) >> print n ``` @@ -222,7 +220,6 @@ below, see the tutorial for more ways. We use the following `delay` function in the examples to demonstrate the concurrency aspects: ``` haskell -import Streamly import qualified Streamly.Prelude as S import Control.Concurrent @@ -245,7 +242,7 @@ ThreadId 36: Delay 1 ### Parallel ``` haskell -main = S.drain . parallely $ delay 3 <> delay 2 <> delay 1 +main = S.drain . S.parallely $ delay 3 <> delay 2 <> delay 1 ``` ``` ThreadId 42: Delay 1 @@ -258,7 +255,6 @@ ThreadId 40: Delay 3 The monad instance composes like a list monad. ``` haskell -import Streamly import qualified Streamly.Prelude as S loops = do @@ -282,7 +278,7 @@ loop can run concurrently but the results are presented to the consumer of the output in the same order as serial execution: ``` haskell -main = S.drain $ aheadly $ loops +main = S.drain $ S.aheadly $ loops ``` Different stream types execute the loop iterations in different ways. For @@ -298,11 +294,10 @@ to concurrently generate squares of a stream of numbers and then concurrently sum the square roots of all combinations of two streams: ``` haskell -import Streamly import qualified Streamly.Prelude as S main = do - s <- S.sum $ asyncly $ do + s <- S.sum $ S.asyncly $ do -- Each square is performed concurrently, (<>) is concurrent x2 <- foldMap (\x -> return $ x * x) [1..100] y2 <- foldMap (\y -> return $ y * y) [1..100] @@ -324,7 +319,7 @@ directories concurrently: ```haskell import Control.Monad.IO.Class (liftIO) import Path.IO (listDir, getCurrentDir) -- from path-io package -import Streamly (AsyncT, adapt) +import Streamly.Prelude (AsyncT, adapt) import qualified Streamly.Prelude as S listDirRecursive :: AsyncT IO () @@ -350,10 +345,9 @@ For bounded concurrent streams, stream yield rate can be specified. For example, to print hello once every second you can simply write this: ``` haskell -import Streamly import Streamly.Prelude as S -main = S.drain $ asyncly $ avgRate 1 $ S.repeatM $ putStrLn "hello" +main = S.drain $ S.asyncly $ S.avgRate 1 $ S.repeatM $ putStrLn "hello" ``` For some practical uses of rate control, see diff --git a/benchmark/NanoBenchmarks.hs b/benchmark/NanoBenchmarks.hs index 4eef064d4b..c7f1ef68e3 100644 --- a/benchmark/NanoBenchmarks.hs +++ b/benchmark/NanoBenchmarks.hs @@ -6,7 +6,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} -import Streamly (SerialT) +import Streamly.Prelude (SerialT) import Streamly.Internal.Data.SVar (MonadAsync) import qualified Streamly.Data.Array.Storable.Foreign as A diff --git a/benchmark/Streamly/Benchmark/Data/ArrayOps.hs b/benchmark/Streamly/Benchmark/Data/ArrayOps.hs index 4199d6c322..b9fab38ab1 100644 --- a/benchmark/Streamly/Benchmark/Data/ArrayOps.hs +++ b/benchmark/Streamly/Benchmark/Data/ArrayOps.hs @@ -21,7 +21,6 @@ import Control.Monad.IO.Class (MonadIO) import Prelude (Bool, Int, Maybe(..), ($), (+), (.), (==), (>), undefined) import qualified Prelude as P -import qualified Streamly as S hiding (foldMapWith, runStream) import qualified Streamly.Prelude as S #ifndef DATA_PRIM_ARRAY diff --git a/benchmark/Streamly/Benchmark/Data/Fold.hs b/benchmark/Streamly/Benchmark/Data/Fold.hs index bb1b474b1b..2838870f52 100644 --- a/benchmark/Streamly/Benchmark/Data/Fold.hs +++ b/benchmark/Streamly/Benchmark/Data/Fold.hs @@ -17,7 +17,6 @@ import Prelude (IO, Int, Double, String, (>), (<*>), (<$>), (+), ($), (<=), Monad(..), (==), Maybe(..), (.), fromIntegral, compare, (>=), concat, seq) -import qualified Streamly as S hiding (runStream) import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Pipe as Pipe @@ -30,7 +29,6 @@ import qualified Streamly.Internal.Data.Fold as IFL import qualified Streamly.Internal.Data.Stream.IsStream as IP import Gauge -import Streamly hiding (runStream) import Streamly.Benchmark.Common -- We need a monadic bind here to make sure that the function f does not get @@ -52,7 +50,7 @@ source = sourceUnfoldrM -- | Takes a fold method, and uses it with a default source. {-# INLINE benchIOSink #-} benchIOSink - :: (IsStream t, NFData b) + :: (S.IsStream t, NFData b) => Int -> String -> (t IO Int -> IO b) -> Benchmark benchIOSink value name f = bench name $ nfIO $ randomRIO (1,1) >>= f . source value diff --git a/benchmark/Streamly/Benchmark/Data/Parser.hs b/benchmark/Streamly/Benchmark/Data/Parser.hs index ff3736cf45..f620a2fe09 100644 --- a/benchmark/Streamly/Benchmark/Data/Parser.hs +++ b/benchmark/Streamly/Benchmark/Data/Parser.hs @@ -24,14 +24,13 @@ import Prelude import qualified Data.Traversable as TR import qualified Data.Foldable as F import qualified Control.Applicative as AP -import qualified Streamly as S hiding (runStream) import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Parser as PR import qualified Streamly.Internal.Data.Stream.IsStream as IP import Gauge -import Streamly hiding (runStream) +import Streamly.Prelude (SerialT) import Streamly.Benchmark.Common ------------------------------------------------------------------------------- @@ -52,7 +51,7 @@ sourceUnfoldrM value n = S.unfoldrM step n -- | Takes a fold method, and uses it with a default source. {-# INLINE benchIOSink #-} benchIOSink - :: (IsStream t, NFData b) + :: (S.IsStream t, NFData b) => Int -> String -> (t IO Int -> IO b) -> Benchmark benchIOSink value name f = bench name $ nfIO $ randomRIO (1,1) >>= f . sourceUnfoldrM value diff --git a/benchmark/Streamly/Benchmark/Data/Parser/ParserD.hs b/benchmark/Streamly/Benchmark/Data/Parser/ParserD.hs index fd117296db..7275cef280 100644 --- a/benchmark/Streamly/Benchmark/Data/Parser/ParserD.hs +++ b/benchmark/Streamly/Benchmark/Data/Parser/ParserD.hs @@ -22,14 +22,13 @@ import Prelude hiding (any, all, take, sequence, sequenceA, takeWhile) import qualified Data.Traversable as TR import qualified Data.Foldable as F import qualified Control.Applicative as AP -import qualified Streamly as S hiding (runStream) import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Parser.ParserD as PR import qualified Streamly.Internal.Data.Stream.IsStream as IP import Gauge -import Streamly hiding (runStream) +import Streamly.Prelude (SerialT, MonadAsync, IsStream) import Streamly.Benchmark.Common ------------------------------------------------------------------------------- @@ -39,7 +38,7 @@ import Streamly.Benchmark.Common -- XXX these can be moved to the common module {-# INLINE sourceUnfoldrM #-} -sourceUnfoldrM :: (S.IsStream t, S.MonadAsync m) => Int -> Int -> t m Int +sourceUnfoldrM :: (IsStream t, MonadAsync m) => Int -> Int -> t m Int sourceUnfoldrM value n = S.unfoldrM step n where step cnt = diff --git a/benchmark/Streamly/Benchmark/Data/Parser/ParserK.hs b/benchmark/Streamly/Benchmark/Data/Parser/ParserK.hs index 47fde4d06a..3de0f706f1 100644 --- a/benchmark/Streamly/Benchmark/Data/Parser/ParserK.hs +++ b/benchmark/Streamly/Benchmark/Data/Parser/ParserK.hs @@ -23,7 +23,6 @@ import Prelude hiding (any, all, take, sequence, sequenceA, takeWhile) import qualified Control.Applicative as AP import qualified Data.Foldable as F import qualified Data.Traversable as TR -import qualified Streamly as S hiding (runStream) import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Parser.ParserK.Types as PR @@ -31,7 +30,7 @@ import qualified Streamly.Internal.Data.Parser.ParserD as PRD import qualified Streamly.Internal.Data.Stream.IsStream as IP import Gauge -import Streamly hiding (runStream) +import Streamly.Prelude (SerialT, MonadAsync, IsStream) import Streamly.Benchmark.Common ------------------------------------------------------------------------------- @@ -41,7 +40,7 @@ import Streamly.Benchmark.Common -- XXX these can be moved to the common module {-# INLINE sourceUnfoldrM #-} -sourceUnfoldrM :: (S.IsStream t, S.MonadAsync m) => Int -> Int -> t m Int +sourceUnfoldrM :: (IsStream t, MonadAsync m) => Int -> Int -> t m Int sourceUnfoldrM value n = S.unfoldrM step n where step cnt = diff --git a/benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs b/benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs index 18e61cd427..0b0e06fd1c 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs @@ -121,11 +121,11 @@ sourceFromFoldableM n = {-# INLINE sourceFoldMapWith #-} sourceFoldMapWith :: Int -> Stream m Int -sourceFoldMapWith n = SP.foldMapWith S.serial S.yield [n..n+value] +sourceFoldMapWith n = SP.concatMapFoldableWith S.serial S.yield [n..n+value] {-# INLINE sourceFoldMapWithM #-} sourceFoldMapWithM :: Monad m => Int -> Stream m Int -sourceFoldMapWithM n = SP.foldMapWith S.serial (S.yieldM . return) [n..n+value] +sourceFoldMapWithM n = SP.concatMapFoldableWith S.serial (S.yieldM . return) [n..n+value] ------------------------------------------------------------------------------- -- Elimination @@ -484,8 +484,8 @@ o_1_space = , benchFold "fromFoldableM" toNull sourceFromFoldableM -- appends - , benchFold "foldMapWith" toNull sourceFoldMapWith - , benchFold "foldMapWithM" toNull sourceFoldMapWithM + , benchFold "concatMapFoldableWith" toNull sourceFoldMapWith + , benchFold "concatMapFoldableWithM" toNull sourceFoldMapWithM ] , bgroup "elimination" [ benchFold "toNull" toNull sourceUnfoldrM @@ -542,7 +542,7 @@ o_1_space = , benchIOSrc1 "concatMapRepl (sqrt n of sqrt n)" (concatMapRepl value2 value2) - -- This is for comparison with foldMapWith + -- This is for comparison with concatMapFoldableWith , benchIOSrc1 "concatMapWithId (n of 1) (fromFoldable)" (S.drain . S.concatMapBy S.serial id . sourceConcatMapId value) diff --git a/benchmark/Streamly/Benchmark/Prelude/Adaptive.hs b/benchmark/Streamly/Benchmark/Prelude/Adaptive.hs index 0d6f68637e..04e570d6ec 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Adaptive.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Adaptive.hs @@ -9,7 +9,6 @@ import Control.Concurrent (threadDelay) import Control.Monad (when) import Control.Monad.IO.Class (liftIO) import Gauge -import Streamly import Streamly.Prelude as S import System.Random (randomRIO) diff --git a/benchmark/Streamly/Benchmark/Prelude/Ahead.hs b/benchmark/Streamly/Benchmark/Prelude/Ahead.hs index 99e87116f7..5486539af9 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Ahead.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Ahead.hs @@ -7,7 +7,7 @@ import Prelude hiding (mapM) -import Streamly (aheadly, serially, ahead, maxBuffer, maxThreads) +import Streamly.Prelude (aheadly, serially, ahead, maxBuffer, maxThreads) import qualified Streamly.Prelude as S import Streamly.Benchmark.Common diff --git a/benchmark/Streamly/Benchmark/Prelude/Async.hs b/benchmark/Streamly/Benchmark/Prelude/Async.hs index 0e2dc5a1ed..ea017f633e 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Async.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Async.hs @@ -7,7 +7,7 @@ import Prelude hiding (mapM) -import Streamly (asyncly, async, maxBuffer, maxThreads, serially) +import Streamly.Prelude (asyncly, async, maxBuffer, maxThreads, serially) import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.StreamK.Type as Internal diff --git a/benchmark/Streamly/Benchmark/Prelude/Concurrent.hs b/benchmark/Streamly/Benchmark/Prelude/Concurrent.hs index 84c72648f2..cc138b6a01 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Concurrent.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Concurrent.hs @@ -10,7 +10,7 @@ import Control.Concurrent import Control.Monad (when, replicateM) import Gauge -import Streamly +import Streamly.Prelude hiding (mapM_, replicateM) import qualified Streamly.Prelude as S ------------------------------------------------------------------------------- diff --git a/benchmark/Streamly/Benchmark/Prelude/Parallel.hs b/benchmark/Streamly/Benchmark/Prelude/Parallel.hs index 846f32318d..869a8121de 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Parallel.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Parallel.hs @@ -9,9 +9,9 @@ import Prelude hiding (mapM) -import Streamly (SerialT, parallely, parallel, serially, maxBuffer, maxThreads) +import Streamly.Prelude + ( SerialT, parallely, parallel, serially, maxBuffer, maxThreads) -import qualified Streamly as S import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Stream.Parallel as Par diff --git a/benchmark/Streamly/Benchmark/Prelude/Rate.hs b/benchmark/Streamly/Benchmark/Prelude/Rate.hs index 7e702a10f2..28784ab2dc 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Rate.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Rate.hs @@ -5,9 +5,8 @@ -- License : BSD3 -- Maintainer : streamly@composewell.com -import Streamly (asyncly, aheadly, maxThreads) - -import qualified Streamly as S +import Streamly.Prelude (asyncly, aheadly, maxThreads) +import qualified Streamly.Prelude as S import Streamly.Benchmark.Common import Streamly.Benchmark.Prelude diff --git a/benchmark/Streamly/Benchmark/Prelude/Serial.hs b/benchmark/Streamly/Benchmark/Prelude/Serial.hs index fe2da5e96b..e65a986375 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Serial.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Serial.hs @@ -22,6 +22,8 @@ {-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-} #endif +module Main where + import Control.DeepSeq (NFData(..)) import Control.Monad (when) import Control.Monad.IO.Class (MonadIO(..)) @@ -48,14 +50,14 @@ import Test.Inspection import qualified Streamly.Internal.Data.Stream.StreamD as D #endif -import qualified Streamly as S + import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.IsStream as Internal import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Unfold as UF import Gauge -import Streamly +import Streamly.Prelude (SerialT, IsStream, serially, serial) import Streamly.Benchmark.Common import Streamly.Benchmark.Prelude import Streamly.Internal.Data.Time.Units diff --git a/benchmark/Streamly/Benchmark/Prelude/WAsync.hs b/benchmark/Streamly/Benchmark/Prelude/WAsync.hs index d46dc43994..b63e039dd8 100644 --- a/benchmark/Streamly/Benchmark/Prelude/WAsync.hs +++ b/benchmark/Streamly/Benchmark/Prelude/WAsync.hs @@ -7,7 +7,7 @@ import Prelude hiding (mapM) -import Streamly (wAsyncly, serially, wAsync, maxBuffer, maxThreads) +import Streamly.Prelude (wAsyncly, serially, wAsync, maxBuffer, maxThreads) import qualified Streamly.Prelude as S import Streamly.Benchmark.Common diff --git a/benchmark/Streamly/Benchmark/Prelude/WSerial.hs b/benchmark/Streamly/Benchmark/Prelude/WSerial.hs index d18630e0e1..775b11e664 100644 --- a/benchmark/Streamly/Benchmark/Prelude/WSerial.hs +++ b/benchmark/Streamly/Benchmark/Prelude/WSerial.hs @@ -19,7 +19,7 @@ {-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-} #endif -import Streamly +import Streamly.Prelude (wSerial, wSerially) import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.IsStream as Internal import qualified Streamly.Internal.Data.Unfold as UF diff --git a/benchmark/Streamly/Benchmark/Prelude/ZipAsync.hs b/benchmark/Streamly/Benchmark/Prelude/ZipAsync.hs index 9117544d90..fddf183fc6 100644 --- a/benchmark/Streamly/Benchmark/Prelude/ZipAsync.hs +++ b/benchmark/Streamly/Benchmark/Prelude/ZipAsync.hs @@ -7,8 +7,7 @@ {-# LANGUAGE FlexibleContexts #-} -import Streamly (serially) -import qualified Streamly as S +import Streamly.Prelude (serially) import qualified Streamly.Prelude as S import Streamly.Benchmark.Common diff --git a/benchmark/Streamly/Benchmark/Prelude/ZipSerial.hs b/benchmark/Streamly/Benchmark/Prelude/ZipSerial.hs index 6320e9d0c7..934421373c 100644 --- a/benchmark/Streamly/Benchmark/Prelude/ZipSerial.hs +++ b/benchmark/Streamly/Benchmark/Prelude/ZipSerial.hs @@ -21,8 +21,7 @@ import Prelude hiding (zip) -import Streamly (zipSerially) -import qualified Streamly as S +import Streamly.Prelude (MonadAsync) import qualified Streamly.Prelude as S import Streamly.Benchmark.Common @@ -47,7 +46,7 @@ moduleName = "Prelude.ZipSerial" -- XXX somehow copying this definition here instead of importing it performs -- better. Need to investigate why. {-# INLINE sourceUnfoldrM #-} -sourceUnfoldrM :: (S.IsStream t, S.MonadAsync m) => Int -> Int -> t m Int +sourceUnfoldrM :: (S.IsStream t, MonadAsync m) => Int -> Int -> t m Int sourceUnfoldrM count start = S.unfoldrM step start where step cnt = @@ -100,7 +99,7 @@ o_1_space_joining value = o_1_space_mapping :: Int -> [Benchmark] o_1_space_mapping value = [ bgroup "mapping" - [ benchIOSink value "fmap" $ fmapN zipSerially 1 + [ benchIOSink value "fmap" $ fmapN S.zipSerially 1 ] ] @@ -113,7 +112,7 @@ o_1_space_outerProduct :: Int -> [Benchmark] o_1_space_outerProduct value = [ bgroup "monad-outer-product" -- XXX needs fixing - [ benchIO "toNullAp" $ toNullAp value zipSerially + [ benchIO "toNullAp" $ toNullAp value S.zipSerially ] ] -} @@ -140,4 +139,3 @@ main = do -- , o_1_space_outerProduct size ] ] - diff --git a/benchmark/lib/Streamly/Benchmark/Common.hs b/benchmark/lib/Streamly/Benchmark/Common.hs index 9f04b44013..d49c73c3fb 100644 --- a/benchmark/lib/Streamly/Benchmark/Common.hs +++ b/benchmark/lib/Streamly/Benchmark/Common.hs @@ -48,7 +48,7 @@ import System.Random (randomRIO) import qualified Streamly.Prelude as S -import Streamly +import Streamly.Prelude (SerialT) import Gauge ------------------------------------------------------------------------------- diff --git a/benchmark/lib/Streamly/Benchmark/Prelude.hs b/benchmark/lib/Streamly/Benchmark/Prelude.hs index 09325fa83d..2b7831e1d1 100644 --- a/benchmark/lib/Streamly/Benchmark/Prelude.hs +++ b/benchmark/lib/Streamly/Benchmark/Prelude.hs @@ -16,11 +16,13 @@ import Control.Applicative (liftA2) import Control.DeepSeq (NFData(..)) import Control.Exception (try) import Data.Functor.Identity (Identity) +#if !(MIN_VERSION_base(4,11,0)) +import Data.Semigroup (Semigroup((<>))) +#endif import GHC.Exception (ErrorCall) import System.Random (randomRIO) import qualified Data.Foldable as F -import qualified Streamly as S import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.IsStream as Internal import qualified Streamly.Internal.Data.Pipe as Pipe @@ -293,21 +295,21 @@ transformZipMapM t n = ------------------------------------------------------------------------------- {-# INLINE sourceFoldMapWith #-} -sourceFoldMapWith :: (S.IsStream t, S.Semigroup (t m Int)) +sourceFoldMapWith :: (S.IsStream t, Semigroup (t m Int)) => Int -> Int -> t m Int -sourceFoldMapWith value n = S.foldMapWith (S.<>) S.yield [n..n+value] +sourceFoldMapWith value n = S.concatMapFoldableWith (<>) S.yield [n..n+value] {-# INLINE sourceFoldMapWithStream #-} -sourceFoldMapWithStream :: (S.IsStream t, S.Semigroup (t m Int)) +sourceFoldMapWithStream :: (S.IsStream t, Semigroup (t m Int)) => Int -> Int -> t m Int -sourceFoldMapWithStream value n = S.foldMapWith (S.<>) S.yield +sourceFoldMapWithStream value n = S.concatMapFoldableWith (<>) S.yield $ (S.enumerateFromTo n (n + value) :: S.SerialT Identity Int) {-# INLINE sourceFoldMapWithM #-} -sourceFoldMapWithM :: (S.IsStream t, Monad m, S.Semigroup (t m Int)) +sourceFoldMapWithM :: (S.IsStream t, Monad m, Semigroup (t m Int)) => Int -> Int -> t m Int sourceFoldMapWithM value n = - S.foldMapWith (S.<>) (S.yieldM . return) [n..n+value] + S.concatMapFoldableWith (<>) (S.yieldM . return) [n..n+value] {-# INLINE sourceFoldMapM #-} sourceFoldMapM :: (S.IsStream t, Monad m, Monoid (t m Int)) diff --git a/examples/AcidRain.hs b/examples/AcidRain.hs index ebbcfaf50c..9fc349746c 100644 --- a/examples/AcidRain.hs +++ b/examples/AcidRain.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleContexts #-} -- Copyright : (c) 2017 Harendra Kumar @@ -6,7 +7,9 @@ -- This example is adapted from Gabriel Gonzalez's pipes-concurrency package. -- https://hackage.haskell.org/package/pipes-concurrency-2.0.8/docs/Pipes-Concurrent-Tutorial.html -import Streamly +#if !(MIN_VERSION_base(4,11,0)) +import Data.Semigroup ((<>)) +#endif import Streamly.Prelude as S import Control.Monad (void) import Control.Monad.IO.Class (MonadIO(liftIO)) @@ -26,13 +29,13 @@ userAction = S.repeatM $ liftIO askUser _ -> putStrLn "Type potion or harm or quit" >> askUser acidRain :: MonadAsync m => SerialT m Event -acidRain = asyncly $ constRate 1 $ S.repeatM $ liftIO $ return $ Harm 1 +acidRain = S.asyncly $ S.constRate 1 $ S.repeatM $ liftIO $ return $ Harm 1 data Result = Check | Done runEvents :: (MonadAsync m, MonadState Int m) => SerialT m Result runEvents = do - event <- userAction `parallel` acidRain + event <- userAction `S.parallel` acidRain case event of Harm n -> modify (\h -> h - n) >> return Check Heal n -> modify (\h -> h + n) >> return Check @@ -52,7 +55,7 @@ getStatus result = main :: IO () main = do - putStrLn "Your health is deteriorating due to acid rain,\ + putStrLn "Your health is deteriorating due to acid rain,\\ \ type \"potion\" or \"quit\"" let runGame = S.drainWhile (== Alive) $ S.mapM getStatus runEvents void $ runStateT runGame 60 diff --git a/examples/CirclingSquare.hs b/examples/CirclingSquare.hs index 7bedd1096c..b23f7c6f17 100644 --- a/examples/CirclingSquare.hs +++ b/examples/CirclingSquare.hs @@ -8,7 +8,6 @@ import Data.IORef import Graphics.UI.SDL as SDL -import Streamly import Streamly.Prelude as S ------------------------------------------------------------------------------ @@ -78,6 +77,6 @@ main :: IO () main = do sdlInit cref <- newIORef (0,0) - S.drain $ asyncly $ constRate 40 + S.drain $ S.asyncly $ S.constRate 40 $ S.repeatM (updateController cref) - `parallel` S.repeatM (updateDisplay cref) + `S.parallel` S.repeatM (updateDisplay cref) diff --git a/examples/ControlFlow.hs b/examples/ControlFlow.hs index 985f713703..bb2c5209a5 100644 --- a/examples/ControlFlow.hs +++ b/examples/ControlFlow.hs @@ -1,4 +1,5 @@ -{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} ------------------------------------------------------------------------------- @@ -14,8 +15,12 @@ -- This file provides an example where we enter a sequence of characters "x", -- and "y" on separate lines, on the command line. When any other sequence is -- entered the control flow short circuits at the first non-matching char and + -- exits. +#if !(MIN_VERSION_base(4,11,0)) +import Data.Semigroup (Semigroup(..)) +#endif import Control.Concurrent (threadDelay) import Control.Exception (catch, SomeException) import Control.Monad @@ -25,7 +30,7 @@ import Control.Monad.Trans.Class import Control.Monad.Trans.Maybe import Control.Monad.Trans.Except import Control.Monad.Trans.Cont -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S ------------------------------------------------------------------------------- diff --git a/examples/EchoServer.hs b/examples/EchoServer.hs index a9c4c6ca9f..37ad334548 100644 --- a/examples/EchoServer.hs +++ b/examples/EchoServer.hs @@ -2,7 +2,6 @@ import Data.Function ((&)) -import Streamly import Streamly.Internal.Network.Socket (handleWithM) import Streamly.Network.Socket @@ -11,8 +10,8 @@ import qualified Streamly.Prelude as S main :: IO () main = - serially (S.unfold TCP.acceptOnPort 8091) - & parallely . S.mapM (handleWithM echo) + S.serially (S.unfold TCP.acceptOnPort 8091) + & S.parallely . S.mapM (handleWithM echo) & S.drain where diff --git a/examples/FileSinkServer.hs b/examples/FileSinkServer.hs index 443cd370d3..307468c6c1 100644 --- a/examples/FileSinkServer.hs +++ b/examples/FileSinkServer.hs @@ -9,7 +9,6 @@ import Control.Monad.IO.Class (liftIO) import Network.Socket (close) import System.Environment (getArgs) -import Streamly import Streamly.Unicode.Stream import qualified Streamly.FileSystem.Handle as FH import qualified Streamly.Data.Array.Storable.Foreign as A @@ -26,7 +25,7 @@ main = do (\src -> S.fold (FH.write src) $ encodeLatin1Lax $ S.concatUnfold A.read - $ S.concatMapWith parallel use + $ S.concatMapWith S.parallel use $ S.unfold TCP.acceptOnPort 8090) where diff --git a/examples/FromFileClient.hs b/examples/FromFileClient.hs index fcde22acf5..8c2bde16f4 100644 --- a/examples/FromFileClient.hs +++ b/examples/FromFileClient.hs @@ -5,7 +5,6 @@ import System.Environment (getArgs) -import Streamly import qualified Streamly.Prelude as S import qualified Streamly.Internal.FileSystem.Handle as IFH import qualified Streamly.Internal.Network.Inet.TCP as TCP @@ -18,4 +17,4 @@ main = withFile file ReadMode $ \src -> S.fold (TCP.writeChunks (127, 0, 0, 1) 8090) $ IFH.toChunks src - in getArgs >>= S.drain . parallely . S.mapM sendFile . S.fromList + in getArgs >>= S.drain . S.parallely . S.mapM sendFile . S.fromList diff --git a/examples/ListDir.hs b/examples/ListDir.hs index a08f9de7b4..3e419fccf0 100644 --- a/examples/ListDir.hs +++ b/examples/ListDir.hs @@ -3,7 +3,6 @@ module Main (main) where import Data.Bifunctor (bimap) import Data.Function ((&)) import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) -import Streamly (ahead) import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.IsStream as S @@ -14,7 +13,7 @@ import qualified Streamly.Internal.FileSystem.Dir as Dir main :: IO () main = do hSetBuffering stdout LineBuffering - S.mapM_ print $ S.iterateMapLeftsWith ahead listDir (S.yield $ (Left ".")) + S.mapM_ print $ S.iterateMapLeftsWith S.ahead listDir (S.yield $ (Left ".")) where diff --git a/examples/MergeSort.hs b/examples/MergeSort.hs index 6f8228b845..176bcb374e 100644 --- a/examples/MergeSort.hs +++ b/examples/MergeSort.hs @@ -10,7 +10,7 @@ import Data.Word import System.Random (getStdGen, randoms) import Data.List (sort) -import Streamly +import Streamly.Prelude (Serial) import qualified Streamly.Prelude as S getSorted :: Serial Word16 diff --git a/examples/Rate.hs b/examples/Rate.hs index f50f2dd7ca..d4b139951a 100644 --- a/examples/Rate.hs +++ b/examples/Rate.hs @@ -1,11 +1,10 @@ -import Streamly import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.IsStream as Internal main :: IO () main = S.mapM_ print - $ asyncly - $ avgRate 1 + $ S.asyncly + $ S.avgRate 1 $ Internal.timestamped $ S.repeatM (pure "tick") diff --git a/examples/SearchQuery.hs b/examples/SearchQuery.hs index e163ff67f8..06274a6d50 100644 --- a/examples/SearchQuery.hs +++ b/examples/SearchQuery.hs @@ -1,7 +1,13 @@ -import Streamly +{-# LANGUAGE CPP #-} + +#if !(MIN_VERSION_base(4,11,0)) +import Data.Semigroup ((<>)) +#endif import Streamly.Prelude (drain, nil, yieldM, (|:)) import Network.HTTP.Simple +import qualified Streamly.Prelude as S + -- | Runs three search engine queries in parallel and prints the search engine -- names in the fastest first order. -- @@ -10,13 +16,13 @@ import Network.HTTP.Simple main :: IO () main = do putStrLn "Using parallel stream construction" - drain . parallely $ google |: bing |: duckduckgo |: nil + drain . S.parallely $ google |: bing |: duckduckgo |: nil putStrLn "\nUsing parallel semigroup composition" - drain . parallely $ yieldM google <> yieldM bing <> yieldM duckduckgo + drain . S.parallely $ yieldM google <> yieldM bing <> yieldM duckduckgo putStrLn "\nUsing parallel applicative zip" - drain . zipAsyncly $ + drain . S.zipAsyncly $ (,,) <$> yieldM google <*> yieldM bing <*> yieldM duckduckgo where diff --git a/examples/WordCount.hs b/examples/WordCount.hs index d5b8dc00cd..d16fac19d3 100644 --- a/examples/WordCount.hs +++ b/examples/WordCount.hs @@ -40,11 +40,10 @@ import Streamly.Internal.Unicode.Stream (DecodeState, DecodeError(..), CodePoint, decodeUtf8Either, resumeDecodeUtf8Either) -import qualified Streamly as S +import qualified Streamly.Internal.Data.Stream.IsStream as S import qualified Streamly.Unicode.Stream as S import qualified Streamly.FileSystem.Handle as FH import qualified Streamly.Data.Array.Storable.Foreign as A -import qualified Streamly.Prelude as S import qualified Data.Vector.Storable.Mutable as V ------------------------------------------------------------------------------- diff --git a/src/Streamly.hs b/src/Streamly.hs index fff6d76d7a..cd3597c839 100644 --- a/src/Streamly.hs +++ b/src/Streamly.hs @@ -51,11 +51,12 @@ -- please see the "Streamly.Tutorial" module and the examples directory in this -- package. +{-# LANGUAGE NoMonomorphismRestriction #-} {-# OPTIONS_GHC -Wno-orphans #-} #include "inline.hs" -module Streamly +module Streamly {-# DEPRECATED "Please use \"Streamly.Prelude\" instead." #-} ( -- -- * Concepts Overview -- -- ** Streams @@ -181,13 +182,12 @@ module Streamly -- finite container of streams. Note that these are just special cases of -- the more general 'concatMapWith' operation. -- - , IP.foldWith - , IP.foldMapWith - , IP.forEachWith + , foldWith + , foldMapWith + , forEachWith -- * Re-exports , Semigroup (..) - -- * Deprecated , Streaming , runStream @@ -206,14 +206,6 @@ module Streamly , zippingAsync , (<=>) , (<|) - - {- - -- * Deprecated/Moved - -- | These APIs have been moved to other modules - , foldWith - , foldMapWith - , forEachWith - -} ) where @@ -640,3 +632,27 @@ mkAsync = return . Async.mkAsync -- which specific type you are converting from or to. If you see a an -- @ambiguous type variable@ error then most likely you are using 'adapt' -- unnecessarily on polymorphic code. + +-- | Same as 'Streamly.Prelude.concatFoldableWith' +-- +-- @since 0.1.0 +{-# DEPRECATED foldWith "Please use 'Streamly.Prelude.concatFoldableWith' instead." #-} +{-# INLINEABLE foldWith #-} +foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a +foldWith = P.concatFoldableWith + +-- | Same as 'Streamly.Prelude.concatMapFoldableWith' +-- +-- @since 0.1.0 +{-# DEPRECATED foldMapWith "Please use 'Streamly.Prelude.concatMapFoldableWith' instead." #-} +{-# INLINEABLE foldMapWith #-} +foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b +foldMapWith = P.concatMapFoldableWith + +-- | Same as 'Streamly.Prelude.concatForFoldableWith' +-- +-- @since 0.1.0 +{-# DEPRECATED forEachWith "Please use 'Streamly.Prelude.concatForFoldableWith' instead." #-} +{-# INLINEABLE forEachWith #-} +forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b +forEachWith = P.concatForFoldableWith diff --git a/src/Streamly/Internal/Data/Pipe.hs b/src/Streamly/Internal/Data/Pipe.hs index 4ea6a48e2f..ea3901fc20 100644 --- a/src/Streamly/Internal/Data/Pipe.hs +++ b/src/Streamly/Internal/Data/Pipe.hs @@ -242,7 +242,7 @@ import Prelude -- import qualified Data.Map.Strict as Map -- import qualified Prelude --- import Streamly (MonadAsync, parallel) +-- import Streamly.Prelude (MonadAsync, parallel) -- import Streamly.Data.Fold.Types (Fold(..)) import Streamly.Internal.Data.Pipe.Types (Pipe(..), PipeState(..), Step(..), zipWith, tee, map, compose) diff --git a/src/Streamly/Internal/Data/SVar.hs b/src/Streamly/Internal/Data/SVar.hs index c5c0167bda..3a1cfac1d1 100644 --- a/src/Streamly/Internal/Data/SVar.hs +++ b/src/Streamly/Internal/Data/SVar.hs @@ -228,7 +228,6 @@ data WorkerInfo = WorkerInfo , workerLatencyStart :: IORef (Count, AbsTime) } - -- | Specifies the stream yield rate in yields per second (@Hertz@). -- We keep accumulating yield credits at 'rateGoal'. At any point of time we -- allow only as many yields as we have accumulated as per 'rateGoal' since the @@ -248,7 +247,9 @@ data WorkerInfo = WorkerInfo -- If the 'rateGoal' is 0 or negative the stream never yields a value. -- If the 'rateBuffer' is 0 or negative we do not attempt to recover. -- --- @since 0.5.0 +-- /Since: 0.5.0 ("Streamly")/ +-- +-- @since 0.8.0 data Rate = Rate { rateLow :: Double -- ^ The lower rate limit , rateGoal :: Double -- ^ The target rate we want to achieve @@ -897,11 +898,15 @@ withDiagMVar sv label action = -- Spawning threads ------------------------------------------------------------------------------ +-- /Since: 0.8.0 ("Streamly.Prelude")/ +-- -- | A monad that can perform concurrent or parallel IO operations. Streams -- that can be composed concurrently require the underlying monad to be -- 'MonadAsync'. -- --- @since 0.1.0 +-- /Since: 0.1.0 ("Streamly")/ +-- +-- @since 0.8.0 type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) -- When we run computations concurrently, we completely isolate the state of diff --git a/src/Streamly/Internal/Data/Stream/Ahead.hs b/src/Streamly/Internal/Data/Stream/Ahead.hs index 7f18b8ca1d..ac6fc0a549 100644 --- a/src/Streamly/Internal/Data/Stream/Ahead.hs +++ b/src/Streamly/Internal/Data/Stream/Ahead.hs @@ -596,7 +596,9 @@ infixr 6 `ahead` -- | Polymorphic version of the 'Semigroup' operation '<>' of 'AheadT'. -- Merges two streams sequentially but with concurrent lookahead. -- --- @since 0.3.0 +-- /Since: 0.3.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE ahead #-} ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a ahead m1 m2 = mkStream $ \st yld sng stp -> @@ -628,12 +630,12 @@ consMAhead m r = fromStream $ K.yieldM m `ahead` (toStream r) -- container of streams. -- -- @ --- import "Streamly" +-- import "Streamly.Prelude" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- -- main = do --- xs \<- S.'toList' . 'aheadly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) +-- xs \<- S.'toList' . S.'aheadly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) -- print xs -- where p n = threadDelay 1000000 >> return n -- @ @@ -650,7 +652,7 @@ consMAhead m r = fromStream $ K.yieldM m `ahead` (toStream r) -- 'SerialT'. -- -- @ --- main = S.drain . 'aheadly' $ do +-- main = S.drain . S.'aheadly' $ do -- n <- return 3 \<\> return 2 \<\> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) @@ -662,19 +664,25 @@ consMAhead m r = fromStream $ K.yieldM m `ahead` (toStream r) -- ThreadId 38: Delay 3 -- @ -- --- @since 0.3.0 +-- /Since: 0.3.0 ("Streamly")/ +-- +-- @since 0.8.0 newtype AheadT m a = AheadT {getAheadT :: Stream m a} deriving (MonadTrans) -- | A serial IO stream of elements of type @a@ with concurrent lookahead. See -- 'AheadT' documentation for more details. -- --- @since 0.3.0 +-- /Since: 0.3.0 ("Streamly")/ +-- +-- @since 0.8.0 type Ahead = AheadT IO -- | Fix the type of a polymorphic stream as 'AheadT'. -- --- @since 0.3.0 +-- /Since: 0.3.0 ("Streamly")/ +-- +-- @since 0.8.0 aheadly :: IsStream t => AheadT m a -> t m a aheadly = K.adapt diff --git a/src/Streamly/Internal/Data/Stream/Async.hs b/src/Streamly/Internal/Data/Stream/Async.hs index 6e52addb9a..db8db20e85 100644 --- a/src/Streamly/Internal/Data/Stream/Async.hs +++ b/src/Streamly/Internal/Data/Stream/Async.hs @@ -513,6 +513,8 @@ mkAsyncD m = D.Stream step Nothing D.Skip s -> D.Skip (Just $ D.Stream step1 s) D.Stop -> D.Stop +-- /Since: 0.8.0 ("Streamly.Prelude")/ +-- -- This is slightly faster than the CPS version above -- -- | Make the stream producer and consumer run concurrently by introducing a @@ -635,7 +637,9 @@ infixr 6 `async` -- Merges two streams possibly concurrently, preferring the -- elements from the left one when available. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE async #-} async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a async = joinStreamVarAsync AsyncVar @@ -675,11 +679,10 @@ consMAsync m r = fromStream $ K.yieldM m `async` (toStream r) -- operation can be used to fold an infinite lazy container of streams. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- --- main = (S.toList . 'asyncly' $ (S.fromList [1,2]) \<> (S.fromList [3,4])) >>= print +-- main = (S.toList . S.'asyncly' $ (S.fromList [1,2]) \<> (S.fromList [3,4])) >>= print -- @ -- @ -- [1,2,3,4] @@ -698,7 +701,7 @@ consMAsync m r = fromStream $ K.yieldM m `async` (toStream r) -- consumer. -- -- @ --- main = 'drain' . 'asyncly' $ do +-- main = S.'drain' . S.'asyncly' $ do -- n <- return 3 \<\> return 2 \<\> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) @@ -710,19 +713,25 @@ consMAsync m r = fromStream $ K.yieldM m `async` (toStream r) -- ThreadId 38: Delay 3 -- @ -- --- @since 0.1.0 +-- /Since: 0.1.0 ("Streamly")/ +-- +-- @since 0.8.0 newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a} deriving (MonadTrans) -- | A demand driven left biased parallely composing IO stream of elements of -- type @a@. See 'AsyncT' documentation for more details. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 type Async = AsyncT IO -- | Fix the type of a polymorphic stream as 'AsyncT'. -- --- @since 0.1.0 +-- /Since: 0.1.0 ("Streamly")/ +-- +-- @since 0.8.0 asyncly :: IsStream t => AsyncT m a -> t m a asyncly = adapt @@ -810,7 +819,9 @@ infixr 6 `wAsync` -- | Polymorphic version of the 'Semigroup' operation '<>' of 'WAsyncT'. -- Merges two streams concurrently choosing elements from both fairly. -- --- @since 0.2.0 +-- @since 0.8.0 +-- +-- /Since: 0.2.0 ("Streamly")/ {-# INLINE wAsync #-} wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a wAsync = joinStreamVarAsync WAsyncVar @@ -828,11 +839,10 @@ wAsync = joinStreamVarAsync WAsyncVar -- contrast to the depth wise scheduling behavior of 'AsyncT'. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- --- main = (S.toList . 'wAsyncly' . maxThreads 1 $ (S.fromList [1,2]) \<> (S.fromList [3,4])) >>= print +-- main = (S.toList . S.'wAsyncly' . S.maxThreads 1 $ (S.fromList [1,2]) \<> (S.fromList [3,4])) >>= print -- @ -- @ -- [1,3,2,4] @@ -843,7 +853,7 @@ wAsync = joinStreamVarAsync WAsyncVar -- now take a more general example: -- -- @ --- main = (S.toList . 'wAsyncly' . maxThreads 1 $ (S.fromList [1,2,3]) \<> (S.fromList [4,5,6]) \<> (S.fromList [7,8,9])) >>= print +-- main = (S.toList . S.'wAsyncly' . S.maxThreads 1 $ (S.fromList [1,2,3]) \<> (S.fromList [4,5,6]) \<> (S.fromList [7,8,9])) >>= print -- @ -- @ -- [1,4,2,7,5,3,8,6,9] @@ -899,7 +909,7 @@ wAsync = joinStreamVarAsync WAsyncVar -- concurrently using a round robin scheduling. -- -- @ --- main = 'drain' . 'wAsyncly' $ do +-- main = S.'drain' . S.'wAsyncly' $ do -- n <- return 3 \<\> return 2 \<\> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) @@ -911,19 +921,25 @@ wAsync = joinStreamVarAsync WAsyncVar -- ThreadId 38: Delay 3 -- @ -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 newtype WAsyncT m a = WAsyncT {getWAsyncT :: Stream m a} deriving (MonadTrans) -- | A round robin parallely composing IO stream of elements of type @a@. -- See 'WAsyncT' documentation for more details. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 type WAsync = WAsyncT IO -- | Fix the type of a polymorphic stream as 'WAsyncT'. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 wAsyncly :: IsStream t => WAsyncT m a -> t m a wAsyncly = adapt diff --git a/src/Streamly/Internal/Data/Stream/Combinators.hs b/src/Streamly/Internal/Data/Stream/Combinators.hs index 6910c3d594..37bbad53e6 100644 --- a/src/Streamly/Internal/Data/Stream/Combinators.hs +++ b/src/Streamly/Internal/Data/Stream/Combinators.hs @@ -48,7 +48,9 @@ import Streamly.Internal.Data.Stream.Serial (SerialT) -- When the actions are CPU bound this option can be used to -- control the amount of CPU used by the stream. -- --- @since 0.4.0 +-- /Since: 0.4.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE_NORMAL maxThreads #-} maxThreads :: IsStream t => Int -> t m a -> t m a maxThreads n m = mkStream $ \st stp sng yld -> @@ -73,7 +75,9 @@ maxThreadsSerial _ = id -- applicative zip streams generates an infinite stream causing unbounded -- concurrent generation with no limit on the buffer or threads. -- --- @since 0.4.0 +-- /Since: 0.4.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE_NORMAL maxBuffer #-} maxBuffer :: IsStream t => Int -> t m a -> t m a maxBuffer n m = mkStream $ \st stp sng yld -> @@ -97,7 +101,9 @@ maxBufferSerial _ = id -- * The maximum rate that the stream producer can achieve -- * The maximum rate that the stream consumer can achieve -- --- @since 0.5.0 +-- /Since: 0.5.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE_NORMAL rate #-} rate :: IsStream t => Maybe Rate -> t m a -> t m a rate r m = mkStream $ \st stp sng yld -> @@ -126,7 +132,9 @@ yieldRateSerial _ = id -- go down to half of the specified rate on the lower side and double of -- the specified rate on the higher side. -- --- @since 0.5.0 +-- /Since: 0.5.0 ("Streamly")/ +-- +-- @since 0.8.0 avgRate :: IsStream t => Double -> t m a -> t m a avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound) @@ -137,7 +145,9 @@ avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound) -- specified rate, even though it may possibly go above it at times, the -- upper limit is double of the specified rate. -- --- @since 0.5.0 +-- /Since: 0.5.0 ("Streamly")/ +-- +-- @since 0.8.0 minRate :: IsStream t => Double -> t m a -> t m a minRate r = rate (Just $ Rate r r (2*r) maxBound) @@ -150,7 +160,9 @@ minRate r = rate (Just $ Rate r r (2*r) maxBound) -- applications where certain resource usage must not be allowed to go -- beyond certain limits. -- --- @since 0.5.0 +-- /Since: 0.5.0 ("Streamly")/ +-- +-- @since 0.8.0 maxRate :: IsStream t => Double -> t m a -> t m a maxRate r = rate (Just $ Rate (r/2) r r maxBound) @@ -162,7 +174,9 @@ maxRate r = rate (Just $ Rate (r/2) r r maxBound) -- applications like graphics frame refresh where we need to maintain a -- constant refresh rate. -- --- @since 0.5.0 +-- /Since: 0.5.0 ("Streamly")/ +-- +-- @since 0.8.0 constRate :: IsStream t => Double -> t m a -> t m a constRate r = rate (Just $ Rate r r r 0) diff --git a/src/Streamly/Internal/Data/Stream/IsStream.hs b/src/Streamly/Internal/Data/Stream/IsStream.hs index cd952822bc..0a152da9bd 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream.hs @@ -19,9 +19,51 @@ module Streamly.Internal.Data.Stream.IsStream ( + -- * Stream Types + -- ** Serial Streams + SerialT + , Serial + , WSerialT + , WSerial + + -- ** Speculative Streams + , AheadT + , Ahead + + -- ** Asynchronous Streams + , AsyncT + , Async + , WAsyncT + , WAsync + , ParallelT + , Parallel + , mkAsync + + -- ** Zipping Streams + , ZipSerialM + , ZipSerial + , ZipAsyncM + , ZipAsync + + -- * Stream Type Adapters + , IsStream () + + , serially + , wSerially + , asyncly + , aheadly + , wAsyncly + , parallely + , zipSerially + , zipAsyncly + , adapt + + -- * Type Synonyms + , MonadAsync + -- * Construction -- ** Primitives - K.nil + , K.nil , K.nilM , K.cons , (K..:) @@ -415,6 +457,7 @@ module Streamly.Internal.Data.Stream.IsStream -- * Combining Streams -- ** Appending + , serial , append -- ** Interleaving @@ -423,18 +466,22 @@ module Streamly.Internal.Data.Stream.IsStream , interleaveSuffix , interleaveInfix + , wSerial , Serial.wSerialFst , Serial.wSerialMin -- ** Scheduling + , ahead + , async + , wAsync , roundrobin -- ** Parallel + , parallel , Par.parallelFst , Par.parallelMin -- ** Merging - -- , merge , mergeBy , mergeByM @@ -448,9 +495,9 @@ module Streamly.Internal.Data.Stream.IsStream , Z.zipAsyncWithM -- ** Flattening a Container of Streams - , foldWith - , foldMapWith - , forEachWith + , concatFoldableWith + , concatMapFoldableWith + , concatForFoldableWith -- ** Flattening a Stream of Streams , concat @@ -514,6 +561,18 @@ module Streamly.Internal.Data.Stream.IsStream , rights , iterateMapLeftsWith + -- * Concurrency Control + , maxThreads + , maxBuffer + + -- * Rate Limiting + , Rate (..) + , rate + , avgRate + , minRate + , maxRate + , constRate + -- * Diagnostics , inspectMode @@ -555,14 +614,26 @@ import Streamly.Internal.Data.Fold.Types (Fold (..), Fold2 (..)) import Streamly.Internal.Data.Parser (Parser (..)) import Streamly.Internal.Data.Unfold.Types (Unfold) import Streamly.Internal.Data.Array.Storable.Foreign.Types (Array, writeNUnsafe) -import Streamly.Internal.Data.SVar (MonadAsync, defState, Rate) -import Streamly.Internal.Data.Stream.Combinators (inspectMode, maxYields) +import Streamly.Internal.Data.SVar (MonadAsync, defState, Rate (..)) +import Streamly.Internal.Data.Stream.Ahead (AheadT, Ahead, ahead, aheadly) +import Streamly.Internal.Data.Stream.Async + ( AsyncT, Async, WAsyncT, WAsync, mkAsync, async, asyncly, wAsync + , wAsyncly) +import Streamly.Internal.Data.Stream.Combinators + ( inspectMode, maxBuffer, maxThreads, maxYields, rate, avgRate, minRate + , maxRate, constRate) +import Streamly.Internal.Data.Stream.Parallel + ( ParallelT, Parallel, parallel, parallely) import Streamly.Internal.Data.Stream.Prelude - (fromStreamS, toStreamS, foldWith, foldMapWith, forEachWith) + (fromStreamS, toStreamS, concatFoldableWith, concatMapFoldableWith + , concatForFoldableWith) import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD) -import Streamly.Internal.Data.Stream.StreamK (IsStream((|:), consM)) -import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT) -import Streamly.Internal.Data.Stream.Zip (ZipSerialM) +import Streamly.Internal.Data.Stream.StreamK (IsStream((|:), consM), adapt) +import Streamly.Internal.Data.Stream.Serial + ( SerialT, WSerialT, Serial, WSerial, serial, wSerial, serially + , wSerially) +import Streamly.Internal.Data.Stream.Zip + ( ZipSerialM, ZipSerial, ZipAsyncM, ZipAsync, zipSerially, zipAsyncly) import Streamly.Internal.Data.Pipe.Types (Pipe (..)) import Streamly.Internal.Data.Time.Units ( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime @@ -918,9 +989,9 @@ iterate step = fromStreamS . S.iterate step -- -- /Concurrent/ -- --- /Since: 0.7.0 (signature change)/ --- -- /Since: 0.1.2/ +-- +-- /Since: 0.7.0 (signature change)/ {-# INLINE_EARLY iterateM #-} iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a iterateM = K.iterateM @@ -1997,7 +2068,9 @@ infixl 1 |&. -- -- /Concurrent/ -- --- @since 0.3.0 +-- /Since: 0.3.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE (|$) #-} (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> (t m a -> t m b) -- (|$) f = f . Async.mkAsync @@ -2024,7 +2097,9 @@ applyAsync = (|$) -- -- /Concurrent/ -- --- @since 0.3.0 +-- /Since: 0.3.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE (|&) #-} (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b x |& f = f |$ x @@ -2049,7 +2124,9 @@ x |& f = f |$ x -- -- /Concurrent/ -- --- @since 0.3.0 +-- /Since: 0.3.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE (|$.) #-} (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> (t m a -> m b) -- (|$.) f = f . Async.mkAsync @@ -2073,7 +2150,9 @@ foldAsync = (|$.) -- -- /Concurrent/ -- --- @since 0.3.0 +-- /Since: 0.3.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE (|&.) #-} (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b x |&. f = f |$. x @@ -2109,9 +2188,9 @@ transform pipe xs = fromStreamD $ D.transform pipe (toStreamD xs) -- designed to work with the @foldl@ library. The suffix @x@ is a mnemonic for -- extraction. -- --- /Since: 0.7.0 (Monad m constraint)/ --- -- /Since 0.2.0/ +-- +-- /Since: 0.7.0 (Monad m constraint)/ {-# DEPRECATED scanx "Please use scanl followed by map instead." #-} {-# INLINE scanx #-} scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b diff --git a/src/Streamly/Internal/Data/Stream/Parallel.hs b/src/Streamly/Internal/Data/Stream/Parallel.hs index 5fd2701489..b80b4d412c 100644 --- a/src/Streamly/Internal/Data/Stream/Parallel.hs +++ b/src/Streamly/Internal/Data/Stream/Parallel.hs @@ -249,7 +249,9 @@ infixr 6 `parallel` -- -- `nilM` is currently @Internal@. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE parallel #-} parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a parallel = joinStreamVarPar ParallelVar StopNone @@ -451,7 +453,7 @@ distributeAsync_ = flip (foldr tapAsync) -- efficient than 'ParallelT'. -- -- @ --- main = ('toList' . 'parallely' $ (fromFoldable [1,2]) \<> (fromFoldable [3,4])) >>= print +-- main = (S.'toList' . S.'parallely' $ (S.fromFoldable [1,2]) \<> (S.fromFoldable [3,4])) >>= print -- @ -- @ -- [1,3,2,4] @@ -471,11 +473,10 @@ distributeAsync_ = flip (foldr tapAsync) -- of the loop concurrently. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- --- main = 'drain' . 'parallely' $ do +-- main = S.'drain' . S.'parallely' $ do -- n <- return 3 \<\> return 2 \<\> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) @@ -490,21 +491,27 @@ distributeAsync_ = flip (foldr tapAsync) -- Note that parallel composition can only combine a finite number of -- streams as it needs to retain state for each unfinished stream. -- +-- /Since: 0.1.0 ("Streamly")/ +-- -- /Since: 0.7.0 (maxBuffer applies to ParallelT streams)/ -- --- /Since: 0.1.0/ +-- @since 0.8.0 newtype ParallelT m a = ParallelT {getParallelT :: Stream m a} deriving (MonadTrans) -- | A parallely composing IO stream of elements of type @a@. -- See 'ParallelT' documentation for more details. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 type Parallel = ParallelT IO -- | Fix the type of a polymorphic stream as 'ParallelT'. -- --- @since 0.1.0 +-- /Since: 0.1.0 ("Streamly")/ +-- +-- @since 0.8.0 parallely :: IsStream t => ParallelT m a -> t m a parallely = adapt diff --git a/src/Streamly/Internal/Data/Stream/Prelude.hs b/src/Streamly/Internal/Data/Stream/Prelude.hs index 066dc9295c..8db54cd0a0 100644 --- a/src/Streamly/Internal/Data/Stream/Prelude.hs +++ b/src/Streamly/Internal/Data/Stream/Prelude.hs @@ -58,9 +58,9 @@ module Streamly.Internal.Data.Stream.Prelude , K.concatMap -- * Fold Utilities - , foldWith - , foldMapWith - , forEachWith + , concatFoldableWith + , concatMapFoldableWith + , concatForFoldableWith ) where @@ -292,58 +292,59 @@ foldbWith :: IsStream t foldbWith f = K.foldb f K.nil -} --- /Since: 0.7.0 ("Streamly.Prelude")/ --- + -- | A variant of 'Data.Foldable.fold' that allows you to fold a 'Foldable' -- container of streams using the specified stream sum operation. -- --- @foldWith 'async' $ map return [1..3]@ +-- @concatFoldableWith 'async' $ map return [1..3]@ -- -- Equivalent to: -- -- @ --- foldWith f = S.foldMapWith f id +-- concatFoldableWith f = S.concatMapFoldableWith f id -- @ -- +-- /Since: 0.8.0 (Renamed foldWith to concatFoldableWith)/ +-- -- /Since: 0.1.0 ("Streamly")/ -{-# INLINABLE foldWith #-} -foldWith :: (IsStream t, Foldable f) +{-# INLINABLE concatFoldableWith #-} +concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a -foldWith f = Prelude.foldr f K.nil +concatFoldableWith f = Prelude.foldr f K.nil --- /Since: 0.7.0 ("Streamly.Prelude")/ --- -- | A variant of 'foldMap' that allows you to map a monadic streaming action -- on a 'Foldable' container and then fold it using the specified stream merge -- operation. -- --- @foldMapWith 'async' return [1..3]@ +-- @concatMapFoldableWith 'async' return [1..3]@ -- -- Equivalent to: -- -- @ --- foldMapWith f g xs = S.concatMapWith f g (S.fromFoldable xs) +-- concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs) -- @ -- +-- /Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)/ +-- -- /Since: 0.1.0 ("Streamly")/ -{-# INLINABLE foldMapWith #-} -foldMapWith :: (IsStream t, Foldable f) +{-# INLINABLE concatMapFoldableWith #-} +concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b -foldMapWith f g = Prelude.foldr (f . g) K.nil +concatMapFoldableWith f g = Prelude.foldr (f . g) K.nil --- /Since: 0.7.0 ("Streamly.Prelude")/ --- --- | Like 'foldMapWith' but with the last two arguments reversed i.e. the +-- | Like 'concatMapFoldableWith' but with the last two arguments reversed i.e. the -- monadic streaming function is the last argument. -- -- Equivalent to: -- -- @ --- forEachWith = flip S.foldMapWith +-- concatForFoldableWith = flip S.concatMapFoldableWith -- @ -- +-- /Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)/ +-- -- /Since: 0.1.0 ("Streamly")/ -{-# INLINABLE forEachWith #-} -forEachWith :: (IsStream t, Foldable f) +{-# INLINABLE concatForFoldableWith #-} +concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b -forEachWith f xs g = Prelude.foldr (f . g) K.nil xs +concatForFoldableWith f xs g = Prelude.foldr (f . g) K.nil xs diff --git a/src/Streamly/Internal/Data/Stream/Serial.hs b/src/Streamly/Internal/Data/Stream/Serial.hs index b9d73af0c9..98705b6547 100644 --- a/src/Streamly/Internal/Data/Stream/Serial.hs +++ b/src/Streamly/Internal/Data/Stream/Serial.hs @@ -137,7 +137,9 @@ import Prelude hiding (map, mapM, errorWithoutStackTrace) -- In the code above, the 'serially' combinator can be omitted as the default -- stream type is 'SerialT'. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 newtype SerialT m a = SerialT {getSerialT :: Stream m a} -- XXX when deriving do we inherit an INLINE? deriving (Semigroup, Monoid, MonadTrans) @@ -145,7 +147,9 @@ newtype SerialT m a = SerialT {getSerialT :: Stream m a} -- | A serial IO stream of elements of type @a@. See 'SerialT' documentation -- for more details. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 type Serial = SerialT IO -- | @@ -155,7 +159,9 @@ type StreamT = SerialT -- | Fix the type of a polymorphic stream as 'SerialT'. -- --- @since 0.1.0 +-- /Since: 0.1.0 ("Streamly")/ +-- +-- @since 0.8.0 serially :: IsStream t => SerialT m a -> t m a serially = adapt @@ -309,14 +315,18 @@ TRAVERSABLE_INSTANCE(SerialT) -- (2,4) -- @ -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 newtype WSerialT m a = WSerialT {getWSerialT :: Stream m a} deriving (MonadTrans) -- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT' -- documentation for more details. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 type WSerial = WSerialT IO -- | @@ -326,7 +336,9 @@ type InterleavedT = WSerialT -- | Fix the type of a polymorphic stream as 'WSerialT'. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 wSerially :: IsStream t => WSerialT m a -> t m a wSerially = adapt @@ -372,7 +384,9 @@ infixr 6 `wSerial` -- When one stream stops the rest of the other stream is used in the output -- stream. -- --- @since 0.2.0 +-- @since 0.8.0 +-- +-- /Since: 0.2.0 ("Streamly")/ {-# INLINE wSerial #-} wSerial :: IsStream t => t m a -> t m a -> t m a wSerial m1 m2 = mkStream $ \st yld sng stp -> do diff --git a/src/Streamly/Internal/Data/Stream/StreamK/Type.hs b/src/Streamly/Internal/Data/Stream/StreamK/Type.hs index 97af8cad1a..f9e166e467 100644 --- a/src/Streamly/Internal/Data/Stream/StreamK/Type.hs +++ b/src/Streamly/Internal/Data/Stream/StreamK/Type.hs @@ -132,7 +132,9 @@ infixr 5 |: -- | Class of types that can represent a stream of elements of some type 'a' in -- some monad 'm'. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 class #if __GLASGOW_HASKELL__ >= 806 ( forall m a. MonadAsync m => Semigroup (t m a) @@ -198,7 +200,9 @@ type Streaming = IsStream -- -- | Adapt any specific stream type to any other specific stream type. -- --- @since 0.1.0 +-- /Since: 0.1.0 ("Streamly")/ +-- +-- @since 0.8.0 adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a adapt = fromStream . toStream @@ -761,7 +765,9 @@ infixr 6 `serial` -- Appends two streams sequentially, yielding all elements from the first -- stream, and then all elements from the second stream. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 {-# INLINE serial #-} serial :: IsStream t => t m a -> t m a -> t m a -- XXX This doubles the time of toNullAp benchmark, may not be fusing properly diff --git a/src/Streamly/Internal/Data/Stream/Zip.hs b/src/Streamly/Internal/Data/Stream/Zip.hs index 06ae6b5c0f..0426003030 100644 --- a/src/Streamly/Internal/Data/Stream/Zip.hs +++ b/src/Streamly/Internal/Data/Stream/Zip.hs @@ -130,7 +130,9 @@ zipAsyncWith f = zipAsyncWithM (\a b -> return (f a b)) -- The 'Semigroup' instance of this type works the same way as that of -- 'SerialT'. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 newtype ZipSerialM m a = ZipSerialM {getZipSerialM :: Stream m a} deriving (Semigroup, Monoid) @@ -141,12 +143,16 @@ type ZipStream = ZipSerialM -- | An IO stream whose applicative instance zips streams serially. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 type ZipSerial = ZipSerialM IO -- | Fix the type of a polymorphic stream as 'ZipSerialM'. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 zipSerially :: IsStream t => ZipSerialM m a -> t m a zipSerially = K.adapt @@ -209,18 +215,24 @@ TRAVERSABLE_INSTANCE(ZipSerialM) -- The 'Semigroup' instance of this type works the same way as that of -- 'SerialT'. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 newtype ZipAsyncM m a = ZipAsyncM {getZipAsyncM :: Stream m a} deriving (Semigroup, Monoid) -- | An IO stream whose applicative instance zips streams wAsyncly. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 type ZipAsync = ZipAsyncM IO -- | Fix the type of a polymorphic stream as 'ZipAsyncM'. -- --- @since 0.2.0 +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a zipAsyncly = K.adapt diff --git a/src/Streamly/Internal/FileSystem/Event/Darwin.hs b/src/Streamly/Internal/FileSystem/Event/Darwin.hs index 61231e0111..36620f5df6 100644 --- a/src/Streamly/Internal/FileSystem/Event/Darwin.hs +++ b/src/Streamly/Internal/FileSystem/Event/Darwin.hs @@ -162,7 +162,7 @@ import Foreign.Marshal.Array (withArray) import Foreign.Ptr (Ptr, castPtr) import Foreign.Storable (Storable(..)) import GHC.IO.Handle.FD (fdToHandle) -import Streamly (SerialT) +import Streamly.Prelude (SerialT) import Streamly.Internal.Data.Cont (contListMap) import Streamly.Internal.Data.Parser (Parser) import Streamly.Internal.Data.Array.Storable.Foreign.Types (Array(..)) diff --git a/src/Streamly/Internal/FileSystem/Event/Linux.hs b/src/Streamly/Internal/FileSystem/Event/Linux.hs index 00b2a7a41e..dd980c6aa3 100644 --- a/src/Streamly/Internal/FileSystem/Event/Linux.hs +++ b/src/Streamly/Internal/FileSystem/Event/Linux.hs @@ -159,7 +159,7 @@ import Foreign.Storable (peek, peekByteOff, sizeOf) import GHC.IO.Device (IODeviceType(Stream)) import GHC.IO.FD (fdFD, mkFD) import GHC.IO.Handle.FD (mkHandleFromFD) -import Streamly (SerialT) +import Streamly.Prelude (SerialT) import Streamly.Internal.Data.Parser (Parser) import Streamly.Internal.Data.Array.Storable.Foreign.Types (Array(..)) import System.IO (Handle, hClose, IOMode(ReadMode)) diff --git a/src/Streamly/Internal/FileSystem/Handle.hs b/src/Streamly/Internal/FileSystem/Handle.hs index 6a36c989c8..6b97d93b71 100644 --- a/src/Streamly/Internal/FileSystem/Handle.hs +++ b/src/Streamly/Internal/FileSystem/Handle.hs @@ -109,7 +109,7 @@ import GHC.ForeignPtr (mallocPlainForeignPtrBytes) import System.IO (Handle, hGetBufSome, hPutBuf, stdin, stdout) import Prelude hiding (read) -import Streamly (MonadAsync) +import Streamly.Prelude (MonadAsync) import Streamly.Data.Fold (Fold) import Streamly.Internal.Data.Fold.Types (Fold2(..)) import Streamly.Internal.Data.Unfold.Types (Unfold(..)) diff --git a/src/Streamly/Internal/Network/Inet/TCP.hs b/src/Streamly/Internal/Network/Inet/TCP.hs index a934f0f45e..b0068ba31a 100644 --- a/src/Streamly/Internal/Network/Inet/TCP.hs +++ b/src/Streamly/Internal/Network/Inet/TCP.hs @@ -103,7 +103,7 @@ import Network.Socket socket) import Prelude hiding (read) -import Streamly (MonadAsync) +import Streamly.Prelude (MonadAsync) import Streamly.Internal.Data.Fold.Types (Fold(..)) import Streamly.Internal.Data.SVar (fork) import Streamly.Internal.Data.Unfold.Types (Unfold(..)) diff --git a/src/Streamly/Internal/Network/Socket.hs b/src/Streamly/Internal/Network/Socket.hs index 4ce5e9820d..62b493c9b2 100644 --- a/src/Streamly/Internal/Network/Socket.hs +++ b/src/Streamly/Internal/Network/Socket.hs @@ -87,7 +87,7 @@ import Prelude hiding (read) import qualified Network.Socket as Net -import Streamly (MonadAsync) +import Streamly.Prelude (MonadAsync) import Streamly.Internal.Data.Unfold.Types (Unfold(..)) import Streamly.Internal.Data.Array.Storable.Foreign.Types (Array(..), lpackArraysChunksOf) import Streamly.Internal.Data.Array.Storable.Foreign.Mut.Types (mutableArray) diff --git a/src/Streamly/Internal/Unicode/Array/Char.hs b/src/Streamly/Internal/Unicode/Array/Char.hs index a5913c09c2..aa11ef595a 100644 --- a/src/Streamly/Internal/Unicode/Array/Char.hs +++ b/src/Streamly/Internal/Unicode/Array/Char.hs @@ -18,9 +18,10 @@ module Streamly.Internal.Unicode.Array.Char where import Control.Monad.IO.Class (MonadIO) -import Streamly (IsStream, MonadAsync) +import Streamly.Prelude (MonadAsync) import Prelude hiding (String, lines, words, unlines, unwords) import Streamly.Data.Array.Storable.Foreign (Array) +import Streamly.Internal.Data.Stream.IsStream (IsStream) import qualified Streamly.Internal.Unicode.Stream as S import qualified Streamly.Data.Array.Storable.Foreign as A diff --git a/src/Streamly/Internal/Unicode/Array/Prim/Pinned.hs b/src/Streamly/Internal/Unicode/Array/Prim/Pinned.hs index f8ad765f9e..6fd059b28f 100644 --- a/src/Streamly/Internal/Unicode/Array/Prim/Pinned.hs +++ b/src/Streamly/Internal/Unicode/Array/Prim/Pinned.hs @@ -18,8 +18,9 @@ module Streamly.Internal.Unicode.Array.Prim.Pinned where import Control.Monad.IO.Class (MonadIO(..)) -import Streamly (IsStream, MonadAsync) +import Streamly.Prelude (MonadAsync) import Prelude hiding (String, lines, words, unlines, unwords) +import Streamly.Internal.Data.Stream.IsStream (IsStream) import Streamly.Internal.Data.Array.Prim.Pinned (Array) import qualified Streamly.Internal.Unicode.Stream as S diff --git a/src/Streamly/Internal/Unicode/Char.hs b/src/Streamly/Internal/Unicode/Char.hs index f3594e82dc..4a54562be6 100644 --- a/src/Streamly/Internal/Unicode/Char.hs +++ b/src/Streamly/Internal/Unicode/Char.hs @@ -24,7 +24,7 @@ where import Data.Char (isAsciiUpper, isAsciiLower) --- import Streamly (IsStream) +-- import Streamly.Prelude (IsStream) ------------------------------------------------------------------------------- -- Unicode aware operations on strings diff --git a/src/Streamly/Internal/Unicode/Stream.hs b/src/Streamly/Internal/Unicode/Stream.hs index fa81d3a2c9..4ef64b5f7b 100644 --- a/src/Streamly/Internal/Unicode/Stream.hs +++ b/src/Streamly/Internal/Unicode/Stream.hs @@ -66,11 +66,11 @@ import GHC.ForeignPtr (ForeignPtr (..)) import GHC.IO.Encoding.Failure (isSurrogate) import GHC.Ptr (Ptr (..), plusPtr) import System.IO.Unsafe (unsafePerformIO) -import Streamly (IsStream) import Streamly.Data.Fold (Fold) import Streamly.Data.Array.Storable.Foreign (Array) import Streamly.Internal.Data.Unfold (Unfold) import Streamly.Internal.Data.SVar (adaptState) +import Streamly.Internal.Data.Stream.IsStream (IsStream) import Streamly.Internal.Data.Stream.StreamD (Stream(..), Step (..)) import Streamly.Internal.Data.Tuple.Strict (Tuple'(..)) diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs index 60e26b5105..3c473ff8da 100644 --- a/src/Streamly/Prelude.hs +++ b/src/Streamly/Prelude.hs @@ -34,11 +34,62 @@ -- Deconstruction and folds accept a 'SerialT' type instead of a polymorphic -- type to ensure that streams always have a concrete monomorphic type by -- default, reducing type errors. In case you want to use any other type of --- stream you can use one of the type combinators provided in the "Streamly" --- module to convert the stream type. +-- stream you can use one of the type combinators provided to convert the +-- stream type. module Streamly.Prelude ( + -- * Stream Types + -- $streamtypes + + -- ** Serial Streams + -- $serial + SerialT + , WSerialT + + -- ** Speculative Streams + -- $ahead + , AheadT + + -- ** Asynchronous Streams + -- $async + , AsyncT + , WAsyncT + , ParallelT + , mkAsync + + -- ** Zipping Streams + -- $zipping + , ZipSerialM + , ZipAsyncM + + -- * IO Streams + , Serial + , WSerial + , Ahead + , Async + , WAsync + , Parallel + , ZipSerial + , ZipAsync + + -- * Type Synonyms + , MonadAsync + + -- * Stream Type Adapters + -- $adapters + , IsStream () + + , serially + , wSerially + , asyncly + , aheadly + , wAsyncly + , parallely + , zipSerially + , zipAsyncly + , adapt + -- * Construction -- ** Primitives -- | Primitives to construct a stream from pure values or monadic actions. @@ -47,7 +98,7 @@ module Streamly.Prelude -- versions provided in this module can be much more efficient in most -- cases. Users can create custom combinators using these primitives. - nil + , nil , cons , (.:) @@ -263,6 +314,13 @@ module Streamly.Prelude -- trimming sequences , stripPrefix + -- * Parallel Function Application + -- $application + , (|$) + , (|&) + , (|$.) + , (|&.) + -- * Transformation -- ** Mapping @@ -464,6 +522,16 @@ module Streamly.Prelude -- >> S.toList $ fold $ [S.fromList [1,2], S.fromList [3,4]] -- [1,2,3,4] -- @ + , serial + + -- ** Interleaving + , wSerial + + -- ** Scheduling + , ahead + , async + , wAsync + , parallel -- ** Merging -- | Streams form a commutative semigroup under the merge @@ -498,18 +566,6 @@ module Streamly.Prelude , zipAsyncWith , zipAsyncWithM - {- - -- ** Folding Containers of Streams - -- | These are variants of standard 'Foldable' fold functions that use a - -- polymorphic stream sum operation (e.g. 'async' or 'wSerial') to fold a - -- finite container of streams. Note that these are just special cases of - -- the more general 'concatMapWith' operation. - -- - , foldMapWith - , forEachWith - , foldWith - -} - -- ** Folding Streams of Streams -- | Stream operations like map and filter represent loop processing in -- imperative programming terms. Similarly, the imperative concept of @@ -539,6 +595,16 @@ module Streamly.Prelude , concatMapM , concatUnfold + -- ** Folding Containers of Streams + -- | These are variants of standard 'Foldable' fold functions that use a + -- polymorphic stream sum operation (e.g. 'async' or 'wSerial') to fold a + -- finite container of streams. Note that these are just special cases of + -- the more general 'concatMapWith' operation. + -- + , concatFoldableWith + , concatMapFoldableWith + , concatForFoldableWith + -- * Exceptions , before , after @@ -547,6 +613,19 @@ module Streamly.Prelude , finally , handle + -- * Concurrency Control + -- $concurrency + , maxThreads + , maxBuffer + + -- * Rate Limiting + , Rate (..) + , rate + , avgRate + , minRate + , maxRate + , constRate + -- * Deprecated , once , each @@ -571,9 +650,124 @@ import Prelude import Streamly.Internal.Data.Stream.IsStream + +-- $streamtypes +-- The basic stream type is 'Serial', it represents a sequence of IO actions, +-- and is a 'Monad'. The type 'SerialT' is a monad transformer that can +-- represent a sequence of actions in an arbitrary monad. The type 'Serial' is +-- in fact a synonym for @SerialT IO@. There are a few more types similar to +-- 'SerialT', all of them represent a stream and differ only in the +-- 'Semigroup', 'Applicative' and 'Monad' compositions of the stream. 'Serial' +-- and 'WSerial' types compose serially whereas 'Async' and 'WAsync' +-- types compose concurrently. All these types can be freely inter-converted +-- using type combinators without any cost. You can freely switch to any type +-- of composition at any point in the program. When no type annotation or +-- explicit stream type combinators are used, the default stream type is +-- inferred as 'Serial'. + +-- $serial +-- +-- When a stream consumer demands an element from a serial stream constructed +-- as @a \`consM` b \`consM` ... nil@, the action @a@ at the head of the stream +-- sequence is executed and the result is supplied to the consumer. When the +-- next element is demanded, the action @b@ is executed and its result is +-- supplied. Thus, the effects are performed and results are consumed strictly +-- in a serial order. Serial streams can be considered as /spatially ordered/ +-- streams as the order of execution and consumption is the same as the spatial +-- order in which the actions are composed by the programmer. +-- +-- Serial streams enforce the side effects as well as the results of the +-- actions to be in the same order in which the actions are added to the +-- stream. Therefore, the semigroup operation for serial streams is not +-- commutative: +-- +-- @ +-- a <> b is not the same as b <> a +-- @ +-- +-- There are two serial stream types 'SerialT' and 'WSerialT'. The stream +-- evaluation of both the variants works in the same way as described above, +-- they differ only in the 'Semigroup' and 'Monad' implementaitons. + +-- $ahead +-- +-- When a stream consumer demands an element from a speculative stream +-- constructed as @a \`consM` b \`consM` ... nil@, the action @a@ at the head +-- of the stream is executed and the output of the action is supplied to the +-- consumer. However, in addition to the action at the head multiple actions +-- following it may also be executed concurrently and the results buffered. +-- When the next element is demanded it may be served from the buffer and we +-- may execute the next action in the sequence to keep the buffer adequately +-- filled. Thus, the actions are executed concurrently but results consumed in +-- serial order just like serial streams. `consM` can be used to fold an +-- infinite lazy container of effects, as the number of concurrent executions +-- is limited. +-- +-- Similar to 'consM', the monadic stream generation (e.g. replicateM) and +-- transformation operations (e.g. mapM) on speculative streams can execute +-- multiple effects concurrently in a speculative manner. +-- +-- How many effects can be executed concurrently and how many results can be +-- buffered are controlled by 'maxThreads' and 'maxBuffer' combinators +-- respectively. The actual number of concurrent threads is adjusted according +-- to the rate at which the consumer is consuming the stream. It may even +-- execute actions serially in a single thread if that is enough to match the +-- consumer's speed. +-- +-- Speculative streams enforce ordering of the results of actions in the stream +-- but the side effects are only partially ordered. Therefore, the semigroup +-- operation for speculative streams is not commutative from the pure outputs +-- perspective but commutative from side effects perspective. + +-- $async +-- +-- /Scheduling and execution:/ In an asynchronous stream @a \`consM` b \`consM` +-- c ...@, the actions @a@, @b@, and @c@ are executed concurrently with the +-- consumer of the stream. The actions are /scheduled/ for execution in the +-- same order as they are specified in the stream. Multiple scheduled actions +-- may be /executed/ concurrently in parallel threads of execution. The +-- actions may be executed out of order and they may complete at arbitrary +-- times. Therefore, the /effects/ of the actions may be observed out of +-- order. +-- +-- /Buffering:/ The /results/ from multiple threads of execution are queued in +-- a buffer as soon as they become available. The consumer of the stream is +-- served from this buffer. Therefore, the consumer may observe the results to +-- be out of order. In other words, an asynchronous stream is an unordered +-- stream i.e. order does not matter. +-- +-- /Concurrency control:/ Threads are suspended if the `maxBuffer` limit is +-- reached, and resumed when the consumer makes space in the buffer. The +-- maximum number of concurrent threads depends on `maxThreads`. Number of +-- threads is increased or decreased based on the speed of the consumer. +-- +-- /Generation operations:/ Concurrent stream generation operations e.g. +-- 'Streamly.Prelude.replicateM' when used in async style schedule and execute +-- the stream generating actions in the manner described above. The generation +-- actions run concurrently, effects and results of the actions as observed by +-- the consumer of the stream may be out of order. +-- +-- /Transformation operations:/ Concurrent stream transformation operations +-- e.g. 'Streamly.Prelude.mapM', when used in async style, schedule and +-- execute transformation actions in the manner described above. Transformation +-- actions run concurrently, effects and results of the actions may be +-- observed by the consumer out of order. +-- +-- /Variants:/ There are two asynchronous stream types 'AsyncT' and 'WAsyncT'. +-- They are identical with respect to single stream evaluation behavior. Their +-- behaviors differ in how they combine multiple streams using 'Semigroup' or +-- 'Monad' composition. Since the order of elements does not matter in +-- asynchronous streams the 'Semigroup' operation is effectively commutative. + +-- $zipping +-- +-- 'ZipSerialM' and 'ZipAsyncM', provide 'Applicative' instances for zipping the +-- corresponding elements of two streams together. Note that these types are +-- not monads. + -- $rightfolds -- --- Let's take a closer look at the @foldr@ definition for lists, as given +-- Let's take a closer look at the @foldr@ definition for lists, as given3 -- earlier: -- -- @ @@ -802,3 +996,67 @@ import Streamly.Internal.Data.Stream.IsStream -- @ -- [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0] -- @ + +-- $application +-- +-- Stream processing functions can be composed in a chain using function +-- application with or without the '$' operator, or with reverse function +-- application operator '&'. Streamly provides concurrent versions of these +-- operators applying stream processing functions such that each stage of the +-- stream can run in parallel. The operators start with a @|@; we can read '|$' +-- as "@parallel dollar@" to remember that @|@ comes before '$'. +-- +-- Imports for the code snippets below: +-- +-- @ +-- import qualified Streamly.Prelude as S +-- import Control.Concurrent +-- @ + +-- $concurrency +-- +-- These combinators can be used at any point in a stream composition to set +-- parameters to control the concurrency of the /argument stream/. A control +-- parameter set at any point remains effective for any concurrent combinators +-- used in the argument stream until it is reset by using the combinator again. +-- These control parameters have no effect on non-concurrent combinators in the +-- stream, or on non-concurrent streams. +-- +-- /Pitfall:/ Remember that 'maxBuffer' in the following example applies to +-- 'mapM' and any other combinators that may follow it, and it does not apply +-- to the combinators before it: +-- +-- @ +-- ... +-- $ S.maxBuffer 10 +-- $ S.mapM ... +-- ... +-- @ +-- +-- If we use '&' instead of '$' the situation will reverse, in the following +-- example, 'maxBuffer' does not apply to 'mapM', it applies to combinators +-- that come before it, because those are the arguments to 'maxBuffer': +-- +-- @ +-- ... +-- & S.maxBuffer 10 +-- & S.mapM ... +-- ... +-- @ + +-- $adapters +-- +-- You may want to use different stream composition styles at different points +-- in your program. Stream types can be freely converted or adapted from one +-- type to another. The 'IsStream' type class facilitates type conversion of +-- one stream type to another. It is not used directly, instead the type +-- combinators provided below are used for conversions. +-- +-- To adapt from one monomorphic type (e.g. 'AsyncT') to another monomorphic +-- type (e.g. 'SerialT') use the 'adapt' combinator. To give a polymorphic code +-- a specific interpretation or to adapt a specific type to a polymorphic type +-- use the type specific combinators e.g. 'asyncly' or 'wSerially'. You +-- cannot adapt polymorphic code to polymorphic code, as the compiler would not know +-- which specific type you are converting from or to. If you see a an +-- @ambiguous type variable@ error then most likely you are using 'adapt' +-- unnecessarily on polymorphic code. diff --git a/src/Streamly/Tutorial.hs b/src/Streamly/Tutorial.hs index c0b375d48f..1ec69d22fe 100644 --- a/src/Streamly/Tutorial.hs +++ b/src/Streamly/Tutorial.hs @@ -170,7 +170,6 @@ module Streamly.Tutorial ) where -import Streamly hiding (foldWith, foldMapWith, forEachWith) import Streamly.Prelude import Data.Semigroup import Control.Applicative @@ -345,8 +344,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- GHCi session and import these lines to start playing. -- -- @ --- > import "Streamly" --- > import "Streamly.Prelude" ((|:)) +-- > import "Streamly.Prelude" ((|:), (|&)) -- > import qualified "Streamly.Prelude" as S -- -- > import Control.Concurrent @@ -412,15 +410,19 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- -- @ -- > let p n = threadDelay (n * 1000000) >> return n --- > S.'toList' $ 'parallely' $ p 3 |: p 2 |: p 1 |: S.'nil' +-- > :set +s +-- > S.'toList' $ S.'parallely' $ p 3 |: p 2 |: p 1 |: S.'nil' -- [1,2,3] --- > S.'toList' $ 'aheadly' $ p 3 |: p 2 |: p 1 |: S.'nil' +-- (3.01 secs, 2,018,432 bytes) +-- > S.'toList' $ S.'aheadly' $ p 3 |: p 2 |: p 1 |: S.'nil' -- [3,2,1] +-- (3.01 secs, 2,055,880 bytes) -- @ -- The following finishes in 10 seconds (100 seconds when serial): -- -- @ --- > S.drain $ 'asyncly' $ S.'replicateM' 10 $ p 10 +-- > S.'drain' $ S.'asyncly' $ S.'replicateM' 10 $ p 10 +-- (10.01 secs, 2,080,320 bytes) -- @ -- @@ -456,7 +458,6 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- numbers them and prints them: -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Data.Char (toUpper) -- import Data.Function ((&)) @@ -482,7 +483,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- -- @ -- > let p n = threadDelay (n * 1000000) >> return n --- > S.'drain' $ S.aheadly $ S.'mapM' (\\x -> p 1 >> print x) (serially $ S.repeatM (p 1)) +-- > S.'drain' $ S.'aheadly' $ S.'mapM' (\\x -> p 1 >> print x) (S.'serially' $ S.'repeatM' (p 1)) -- @ -- @@ -515,7 +516,6 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- not explicitly specified use the imports shown below. -- -- @ --- import "Streamly" -- import "Streamly.Prelude" ((|:), nil) -- import qualified "Streamly.Prelude" as S -- @@ -553,7 +553,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- 3, 4: -- -- @ --- main = S.'drain' $ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil) +-- main = S.'drain' $ (print 1 |: print 2 |: S.nil) <> (print 3 |: print 4 |: S.nil) -- @ -- @ -- 1 @@ -580,7 +580,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- irrespective of the type of stream: -- -- @ --- main = S.'drain' $ (print 1 |: print 2 |: nil) \`serial` (print 3 |: print 4 |: nil) +-- main = S.'drain' $ (print 1 |: print 2 |: S.nil) \`S.serial` (print 3 |: print 4 |: S.nil) -- @ -- $interleaved @@ -597,7 +597,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- The following example prints the sequence 1, 3, 2, 4 -- -- @ --- main = S.'drain' . 'wSerially' $ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil) +-- main = S.'drain' . S.'wSerially' $ (print 1 |: print 2 |: S.'nil') <> (print 3 |: print 4 |: S.'nil') -- @ -- @ -- 1 @@ -612,7 +612,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- thread and take a combined total of @3 + 2 + 1 = 6@ seconds: -- -- @ --- main = S.'drain' . 'wSerially' $ delay 3 <> delay 2 <> delay 1 +-- main = S.'drain' . S.'wSerially' $ delay 3 <> delay 2 <> delay 1 -- @ -- @ -- ThreadId 36: Delay 3 @@ -626,7 +626,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- combinator in the following example: -- -- @ --- main = S.'drain' $ (print 1 |: print 2 |: nil) \`wSerial` (print 3 |: print 4 |: nil) +-- main = S.'drain' $ (print 1 |: print 2 |: S.'nil') \`S.wSerial` (print 3 |: print 4 |: S.'nil') -- @ -- @ -- 1 @@ -656,7 +656,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- -- @ -- main = do --- xs \<- S.'toList' . 'aheadly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) +-- xs \<- S.'toList' . S.'aheadly' $ (p 1 |: p 2 |: S.nil) <> (p 3 |: p 4 |: S.nil) -- print xs -- where p n = threadDelay 1000000 >> return n -- @ @@ -693,7 +693,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- -- @ -- main = do --- xs \<- S.'toList' . 'asyncly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) +-- xs \<- S.'toList' . S.'asyncly' $ (p 1 |: p 2 |: S.'nil') <> (p 3 |: p 4 |: S.'nil') -- print xs -- where p n = threadDelay 1000000 >> return n -- @ @@ -708,7 +708,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- -- @ -- main = do --- xs \<- S.'toList' . 'asyncly' $ (serially $ p 1 |: p 2 |: nil) <> (serially $ p 3 |: p 4 |: nil) +-- xs \<- S.'toList' . S.'asyncly' $ (S.'serially' $ p 1 |: p 2 |: S.'nil') <> (serially $ p 3 |: p 4 |: S.'nil') -- print xs -- where p n = threadDelay 1000000 >> return n -- @ @@ -722,7 +722,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- (3, 2, 1) = 3@ seconds: -- -- @ --- main = S.'drain' . 'asyncly' $ delay 3 '<>' delay 2 '<>' delay 1 +-- main = S.'drain' . S.'asyncly' $ delay 3 '<>' delay 2 '<>' delay 1 -- @ -- @ -- ThreadId 42: Delay 1 @@ -741,7 +741,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- second as all of the actions are concurrent. -- -- @ --- main = S.'drain' . 'asyncly' $ (delay 1 <> delay 2) <> (delay 3 <> delay 4) +-- main = S.'drain' . S.'asyncly' $ (delay 1 <> delay 2) <> (delay 3 <> delay 4) -- @ -- @ -- 1 @@ -758,7 +758,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- even if none of them blocks: -- -- @ --- main = S.'drain' . 'asyncly' $ traced (sqrt 9) '<>' traced (sqrt 16) '<>' traced (sqrt 25) +-- main = S.'drain' . S.'asyncly' $ traced (sqrt 9) '<>' traced (sqrt 16) '<>' traced (sqrt 25) -- where traced m = S.'yieldM' (myThreadId >>= print) >> return m -- @ -- @ @@ -776,7 +776,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- not used the 'asyncly' combinator in the following example: -- -- @ --- main = S.'drain' $ delay 3 \`async` delay 2 \`async` delay 1 +-- main = S.'drain' $ delay 3 \`S.'async'` delay 2 \`S.'async'` delay 1 -- @ -- @ -- ThreadId 42: Delay 1 @@ -819,7 +819,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- first traversal order but this is not guaranteed. -- -- @ --- main = S.'drain' . 'wAsyncly' $ (serially $ print 1 |: print 2 |: nil) <> (serially $ print 3 |: print 4 |: nil) +-- main = S.'drain' . S.'wAsyncly' $ (S.'serially' $ print 1 |: print 2 |: S.'nil') <> (S.'serially' $ print 3 |: print 4 |: S.'nil') -- @ -- @ -- 1 @@ -834,7 +834,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- the 'wAsyncly' combinator in the following example: -- -- @ --- main = S.'drain' $ delay 3 \`wAsync` delay 2 \`wAsync` delay 1 +-- main = S.'drain' $ delay 3 \S.`wAsync` delay 2 \S.`wAsync` delay 1 -- @ -- @ -- ThreadId 42: Delay 1 @@ -873,11 +873,10 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- package to run this example: -- -- @ --- import "Streamly" -- import qualified Streamly.Prelude as S -- import Network.HTTP.Simple -- --- main = S.'drain' . 'parallely' $ google \<> bing \<> duckduckgo +-- main = S.'drain' . S.'parallely' $ google \<> bing \<> duckduckgo -- where -- google = get "https://www.google.com/search?q=haskell" -- bing = get "https://www.bing.com/search?q=haskell" @@ -891,7 +890,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- 'parallely' combinator in the following example: -- -- @ --- main = S.'drain' $ delay 3 \`parallel` delay 2 \`wAsync` delay 1 +-- main = S.'drain' $ delay 3 \`S.'parallel'` delay 2 \`S.'wAsync'` delay 1 -- @ -- @ -- ThreadId 42: Delay 1 @@ -921,27 +920,26 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- style using 'fold' or 'foldMap'. We have also provided some fold utilities -- to fold streams using the polymorphic combine operations: -- --- * 'foldWith' is like 'fold', it folds a 'Foldable' container of streams --- using the given composition operator. --- * 'foldMapWith' is like 'foldMap', it folds like @foldWith@ but also maps a --- function before folding. --- * 'forEachWith' is like @foldMapwith@ but the container argument comes before --- the function argument. +-- * 'concatFoldableWith' is like 'fold', it folds a 'Foldable' container of +-- streams using the given composition operator. +-- * 'concatMapFoldableWith' is like 'foldMap', it folds like +-- @concatFoldableWith@ but also maps a function before folding. +-- * 'concatForFoldableWith' is like @concatMapFoldableWith@ but the container +-- argument comes before the function argument. -- -- All of the following are equivalent and start ten concurrent tasks each with -- a delay from 1 to 10 seconds, resulting in the printing of each number every -- second: -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- -- main = do --- S.'drain' $ 'asyncly' $ foldMap delay [1..10] --- S.'drain' $ S.'foldWith' 'async' (map delay [1..10]) --- S.'drain' $ S.'foldMapWith' 'async' delay [1..10] --- S.'drain' $ S.'forEachWith' 'async' [1..10] delay +-- S.'drain' $ S.'asyncly' $ foldMap delay [1..10] +-- S.'drain' $ S.'concatFoldableWith' S.'async' (map delay [1..10]) +-- S.'drain' $ S.'concatMapFoldableWith' S.'async' delay [1..10] +-- S.'drain' $ S.'concatForFoldableWith' S.'async' [1..10] delay -- where delay n = S.'yieldM' $ threadDelay (n * 1000000) >> print n -- @ @@ -987,7 +985,6 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- instead. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' $ do @@ -1009,12 +1006,11 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- written like this: -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- import Control.Monad (forever) -- --- main = S.'drain' $ forever $ S.yieldM getLine >>= S.yieldM . putStrLn +-- main = S.'drain' $ forever $ S.'yieldM' getLine >>= S.'yieldM' . putStrLn -- @ -- -- When multiple streams are composed using this style they nest in a DFS @@ -1023,7 +1019,6 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- loops in imperative programming. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' $ do @@ -1052,10 +1047,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- type of the stream as 'Ahead'. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- --- comp = S.'toList' . 'aheadly' $ do +-- comp = S.'toList' . S.'aheadly' $ do -- x <- S.'fromFoldable' [3,2,1] -- delay x >> return x -- @@ -1093,10 +1087,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- type of the stream as 'Async'. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- --- main = S.'drain' . 'asyncly' $ do +-- main = S.'drain' . S.'asyncly' $ do -- x <- S.'fromFoldable' [3,2,1] -- delay x -- @ @@ -1120,10 +1113,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- on the demand from the consumer of the stream. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- --- main = S.'drain' . 'asyncly' $ do +-- main = S.'drain' . S.'asyncly' $ do -- x <- S.'fromFoldable' [1,2] -- y <- S.'fromFoldable' [3,4] -- S.'yieldM' $ putStrLn $ show (x, y) @@ -1145,10 +1137,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- type of the stream as 'WSerial'. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- --- main = S.'drain' . 'wSerially' $ do +-- main = S.'drain' . S.'wSerially' $ do -- x <- S.'fromFoldable' [1,2] -- y <- S.'fromFoldable' [3,4] -- S.yieldM $ putStrLn $ show (x, y) @@ -1173,10 +1164,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- specify the type of the stream as 'WAsync'. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- --- main = S.'drain' . 'wAsyncly' $ do +-- main = S.'drain' . S.'wAsyncly' $ do -- x <- S.'fromFoldable' [1,2] -- y <- S.'fromFoldable' [3,4] -- S.'yieldM' $ putStrLn $ show (x, y) @@ -1199,10 +1189,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- annotation can be used to specify the type of the stream as 'Parallel'. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- --- main = S.'drain' . 'parallely' $ do +-- main = S.'drain' . S.'parallely' $ do -- x <- S.'fromFoldable' [3,2,1] -- delay x -- @ @@ -1220,10 +1209,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- specific mode of composition. For example take a look at the following code. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- --- composed :: (IsStream t, Monad (t IO)) => t IO () +-- composed :: (S.IsStream t, Monad (t IO)) => t IO () -- composed = do -- sz <- sizes -- cl <- colors @@ -1240,11 +1228,11 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- Now we can interpret this in whatever way we want: -- -- @ --- main = S.'drain' . 'serially' $ composed --- main = S.'drain' . 'wSerially' $ composed --- main = S.'drain' . 'asyncly' $ composed --- main = S.'drain' . 'wAsyncly' $ composed --- main = S.'drain' . 'parallely' $ composed +-- main = S.'drain' . S.'serially' $ composed +-- main = S.'drain' . S.'wSerially' $ composed +-- main = S.'drain' . S.'asyncly' $ composed +-- main = S.'drain' . S.'wAsyncly' $ composed +-- main = S.'drain' . S.'parallely' $ composed -- @ -- -- As an exercise try to figure out the output of this code for each mode of @@ -1257,7 +1245,6 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- serial. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = (S.'toList' $ fmap show $ S.'fromFoldable' [1..10]) >>= print @@ -1276,7 +1263,6 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- serially and takes a total 17 seconds (1 + 3 + 4 + 2 + 3 + 4): -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- @@ -1284,7 +1270,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- s2 = d 3 <> d 4 -- d n = delay n >> return n -- --- main = (S.'toList' . 'serially' $ (,) \<$> s1 \<*> s2) >>= print +-- main = (S.'toList' . S.'serially' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- @ -- ThreadId 36: Delay 1 @@ -1300,7 +1286,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- order but since it is serial it takes a total of 17 seconds: -- -- @ --- main = (S.'toList' . 'wSerially' $ (,) \<$> s1 \<*> s2) >>= print +-- main = (S.'toList' . S.'wSerially' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- @ -- ThreadId 36: Delay 1 @@ -1316,7 +1302,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- of 6 seconds which is max (1, 2) + max (3, 4): -- -- @ --- main = (S.'toList' . 'asyncly' $ (,) \<$> s1 \<*> s2) >>= print +-- main = (S.'toList' . S.'asyncly' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- @ -- ThreadId 34: Delay 1 @@ -1332,7 +1318,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- therefore takes a total of 6 seconds (2 + 4): -- -- @ --- main = (S.'toList' . 'wAsyncly' $ (,) \<$> s1 \<*> s2) >>= print +-- main = (S.'toList' . S.'wAsyncly' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- @ -- ThreadId 34: Delay 1 @@ -1360,15 +1346,14 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- zip composition: -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- -- d n = delay n >> return n --- s1 = 'serially' $ d 1 <> d 2 --- s2 = 'serially' $ d 3 <> d 4 +-- s1 = S.'serially' $ d 1 <> d 2 +-- s2 = S.'serially' $ d 3 <> d 4 -- --- main = (S.'toList' . 'zipSerially' $ (,) \<$> s1 \<*> s2) >>= print +-- main = (S.'toList' . S.'zipSerially' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- -- This takes total 10 seconds to zip, which is (1 + 2 + 3 + 4) since @@ -1390,18 +1375,17 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) -- -- d n = delay n >> return n --- s1 = 'serially' $ d 1 <> d 2 --- s2 = 'serially' $ d 3 <> d 4 +-- s1 = S.'serially' $ d 1 <> d 2 +-- s2 = S.'serially' $ d 3 <> d 4 -- -- main = do -- hSetBuffering stdout LineBuffering --- (S.'toList' . 'zipAsyncly' $ (,) \<$> s1 \<*> s2) >>= print +-- (S.'toList' . S.'zipAsyncly' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- -- This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 @@ -1432,17 +1416,16 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- combinator instead of explicitly folding with 'async'. -- -- @ --- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Data.List (sum) -- -- main = do -- z \<- S.'toList' --- $ 'serially' -- Serial monadic processing (sqrt below) +-- $ S.'serially' -- Serial monadic processing (sqrt below) -- $ do --- x2 \<- 'forEachWith' 'async' [1..100] $ -- Concurrent @"for"@ loop +-- x2 \<- S.'concatForFoldableWith' S.'async' [1..100] $ -- Concurrent @"for"@ loop -- \\x -> return $ x * x -- body of the loop --- y2 \<- 'forEachWith' 'async' [1..100] $ +-- y2 \<- S.'concatForFoldableWith' S.'async' [1..100] $ -- \\y -> return $ y * y -- return $ sqrt (x2 + y2) -- print $ sum z @@ -1477,11 +1460,11 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- @ -- {-\# LANGUAGE FlexibleContexts #-} -- --- import "Streamly" +-- import "Streamly.Prelude" (MonadAsync, SerialT) -- import "Streamly.Prelude" as S --- import Control.Monad (void, when) +-- import Control.Monad (void) -- import Control.Monad.IO.Class (MonadIO(liftIO)) --- import Control.Monad.State (MonadState, get, modify, runStateT, put) +-- import Control.Monad.State (MonadState, get, modify, runStateT) -- -- data Event = Quit | Harm Int | Heal Int deriving (Show) -- @@ -1497,13 +1480,13 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- _ -> putStrLn "Type potion or harm or quit" >> askUser -- -- acidRain :: MonadAsync m => 'SerialT' m Event --- acidRain = 'asyncly' $ 'constRate' 1 $ S.'repeatM' $ liftIO $ return $ Harm 1 +-- acidRain = S.'asyncly' $ S.'constRate' 1 $ S.'repeatM' $ liftIO $ return $ Harm 1 -- -- data Result = Check | Done -- -- runEvents :: (MonadAsync m, MonadState Int m) => 'SerialT' m Result -- runEvents = do --- event \<- userAction \`parallel` acidRain +-- event \<- userAction \`S.'parallel'` acidRain -- case event of -- Harm n -> modify (\\h -> h - n) >> return Check -- Heal n -> modify (\\h -> h + n) >> return Check @@ -1556,12 +1539,12 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- Interop with @vector@: -- -- @ --- import Streamly --- import qualified Streamly.Prelude as S +-- import "Streamly.Prelude" (SerialT) +-- import qualified "Streamly.Prelude" as S -- import qualified Data.Vector.Fusion.Stream.Monadic as V -- -- -- | vector to streamly --- fromVector :: (IsStream t, Monad m) => V.Stream m a -> t m a +-- fromVector :: (S.IsStream t, Monad m) => V.Stream m a -> t m a -- fromVector = S.unfoldrM unconsV -- where -- unconsV v = do @@ -1574,7 +1557,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- -- -- | streamly to vector -- toVector :: Monad m => SerialT m a -> V.Stream m a --- toVector = V.unfoldrM (S.uncons . adapt) +-- toVector = V.unfoldrM (S.uncons . S.adapt) -- -- main = do -- S.toList (fromVector (V.fromList [1..3])) >>= print @@ -1584,13 +1567,13 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- Interop with @pipes@: -- -- @ --- import "Streamly" +-- import "Streamly.Prelude" (SerialT) -- import qualified "Streamly.Prelude" as S -- import qualified Pipes as P -- import qualified Pipes.Prelude as P -- -- -- | pipes to streamly --- fromPipes :: (IsStream t, Monad m) => P.Producer a m r -> t m a +-- fromPipes :: (S.IsStream t, Monad m) => P.Producer a m r -> t m a -- fromPipes = S.'unfoldrM' unconsP -- where -- -- Adapt P.next to return a Maybe instead of Either @@ -1611,7 +1594,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- Interop with @streaming@: -- -- @ --- import "Streamly" +-- import "Streamly.Prelude" (SerialT, MonadAsync) -- import qualified "Streamly.Prelude" as S -- import qualified Streaming as SG -- import qualified Streaming.Prelude as SG @@ -1635,7 +1618,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift)) -- Interop with @conduit@: -- -- @ --- import "Streamly" +-- import "Streamly.Prelude" (SerialT) -- import qualified "Streamly.Prelude" as S -- import qualified Data.Conduit as C -- import qualified Data.Conduit.List as C diff --git a/test/Main.hs b/test/Main.hs index 1e583b046e..688ae45ee8 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -13,14 +13,18 @@ import Data.Foldable (forM_, fold) import Data.Function ((&)) import Data.List (sort) import Data.Maybe (fromJust, isJust) +#if !(MIN_VERSION_base(4,11,0)) +import Data.Semigroup (Semigroup(..)) +#endif import System.Mem (performMajorGC) import Data.IORef import Test.Hspec as H -import Streamly -import Streamly.Prelude ((.:), nil) -import qualified Streamly as S +import Streamly.Prelude + ( AsyncT, SerialT, WSerialT, MonadAsync, IsStream, (.:), nil, adapt + , ahead, async, aheadly, asyncly, serially, parallely, wAsync, wAsyncly + , wSerially, maxBuffer) import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.IsStream as IP @@ -478,7 +482,7 @@ parallelTests = H.parallel $ do it "wAsync maintains independent states in concurrent tasks" (monadicStateSnapshotOp wAsync) it "parallel maintains independent states in concurrent tasks" - (monadicStateSnapshotOp Streamly.parallel) + (monadicStateSnapshotOp S.parallel) --------------------------------------------------------------------------- -- Slower tests are at the end @@ -757,8 +761,8 @@ composeWithMonadThrow t = do oneLevelNestedProduct desc t1 = it ("One level nested product" <> desc) $ do - let s1 = t $ S.foldMapWith (<>) return [1..4] - s2 = t1 $ S.foldMapWith (<>) return [5..8] + let s1 = t $ S.concatMapFoldableWith (<>) return [1..4] + s2 = t1 $ S.concatMapFoldableWith (<>) return [5..8] try $ tl (do x <- adapt s1 y <- s2 @@ -783,8 +787,8 @@ _composeWithMonadError t = do nestTwoSerial :: Expectation nestTwoSerial = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in toListSerial (do x <- s1 y <- s2 @@ -793,8 +797,8 @@ nestTwoSerial = nestTwoAhead :: Expectation nestTwoAhead = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in (S.toList . aheadly) (do x <- s1 y <- s2 @@ -803,22 +807,22 @@ nestTwoAhead = nestTwoSerialApp :: Expectation nestTwoSerialApp = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in toListSerial ((+) <$> s1 <*> s2) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) nestTwoAheadApp :: Expectation nestTwoAheadApp = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in (S.toList . aheadly) ((+) <$> s1 <*> s2) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) nestTwoInterleaved :: Expectation nestTwoInterleaved = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in toListInterleaved (do x <- s1 y <- s2 @@ -827,15 +831,15 @@ nestTwoInterleaved = nestTwoInterleavedApp :: Expectation nestTwoInterleavedApp = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in toListInterleaved ((+) <$> s1 <*> s2) `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) nestTwoAsync :: Expectation nestTwoAsync = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in sort <$> toListAsync (do x <- s1 y <- s2 @@ -844,15 +848,15 @@ nestTwoAsync = nestTwoAsyncApp :: Expectation nestTwoAsyncApp = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in sort <$> toListAsync ((+) <$> s1 <*> s2) `shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) nestTwoWAsync :: Expectation nestTwoWAsync = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in sort <$> (S.toList . wAsyncly) (do x <- s1 y <- s2 @@ -861,8 +865,8 @@ nestTwoWAsync = nestTwoParallel :: Expectation nestTwoParallel = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in sort <$> (S.toList . parallely) (do x <- s1 y <- s2 @@ -871,15 +875,15 @@ nestTwoParallel = nestTwoWAsyncApp :: Expectation nestTwoWAsyncApp = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in sort <$> (S.toList . wAsyncly) ((+) <$> s1 <*> s2) `shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) nestTwoParallelApp :: Expectation nestTwoParallelApp = - let s1 = S.foldMapWith (<>) return [1..4] - s2 = S.foldMapWith (<>) return [5..8] + let s1 = S.concatMapFoldableWith (<>) return [1..4] + s2 = S.concatMapFoldableWith (<>) return [5..8] in sort <$> (S.toList . parallely) ((+) <$> s1 <*> s2) `shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) @@ -918,7 +922,7 @@ composeAndComposeSimple -> (t2 IO Int -> t2 IO Int) -> [[Int]] -> Spec composeAndComposeSimple t1 t2 answer = do - let rfold = adapt . t2 . S.foldMapWith (<>) return + let rfold = adapt . t2 . S.concatMapFoldableWith (<>) return it "Compose right associated outer expr, right folded inner" $ (S.toList . t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9])) `shouldReturn` head answer @@ -966,10 +970,10 @@ bindAndComposeSimple -> (t2 IO Int -> t2 IO Int) -> Spec bindAndComposeSimple t1 t2 = do - -- XXX need a bind in the body of forEachWith instead of a simple return + -- XXX need a bind in the body of concatForFoldableWith instead of a simple return it "Compose many (right fold) with bind" $ (sort <$> (S.toList . t1) - (adapt . t2 $ S.forEachWith (<>) [1..10 :: Int] return)) + (adapt . t2 $ S.concatForFoldableWith (<>) [1..10 :: Int] return)) `shouldReturn` [1..10] it "Compose many (left fold) with bind" $ @@ -1079,7 +1083,7 @@ testFromCallback :: IO Int testFromCallback = do ref <- newIORef Nothing let stream = S.map Just (IP.fromCallback (setCallback ref)) - `Streamly.parallel` runCallback ref + `S.parallel` runCallback ref S.sum $ S.map fromJust $ S.takeWhile isJust stream where diff --git a/test/PureStreams.hs b/test/PureStreams.hs index 2ef5b27e55..1abf5c50b4 100644 --- a/test/PureStreams.hs +++ b/test/PureStreams.hs @@ -4,15 +4,17 @@ module Main (main) where +#if !(MIN_VERSION_base(4,11,0)) +import Data.Semigroup ((<>)) +#endif import Test.Hspec import qualified GHC.Exts as GHC -import Streamly - #ifdef USE_STREAMLY_LIST import Data.Functor.Identity import Streamly.Internal.Data.List (List(..), pattern Cons, pattern Nil, ZipList(..), fromZipList, toZipList) +import Streamly.Prelude (SerialT) import qualified Streamly.Prelude as S #else import Prelude -- to suppress compiler warning diff --git a/test/Streamly/Test/Array.hs b/test/Streamly/Test/Array.hs index 59cd053422..99ba6511a7 100644 --- a/test/Streamly/Test/Array.hs +++ b/test/Streamly/Test/Array.hs @@ -17,8 +17,7 @@ import Test.QuickCheck.Monadic (monadicIO, assert, run) import Test.Hspec as H -import Streamly (SerialT) - +import Streamly.Prelude (SerialT) import qualified Streamly.Prelude as S #ifdef TEST_SMALL_ARRAY diff --git a/test/Streamly/Test/FileSystem/Event.hs b/test/Streamly/Test/FileSystem/Event.hs index 83009543df..313733948c 100644 --- a/test/Streamly/Test/FileSystem/Event.hs +++ b/test/Streamly/Test/FileSystem/Event.hs @@ -17,7 +17,7 @@ import Data.List.NonEmpty (NonEmpty) import Data.Word (Word8) import Streamly.Internal.Data.Array.Storable.Foreign (Array) import System.Environment (getArgs) -import Streamly (SerialT) +import Streamly.Prelude (SerialT) import qualified Data.List.NonEmpty as NonEmpty import qualified Streamly.Unicode.Stream as Unicode diff --git a/test/Streamly/Test/Prelude.hs b/test/Streamly/Test/Prelude.hs index b71186ee82..f120e985e9 100644 --- a/test/Streamly/Test/Prelude.hs +++ b/test/Streamly/Test/Prelude.hs @@ -80,7 +80,7 @@ import Data.List ) import Data.Maybe (mapMaybe) #if __GLASGOW_HASKELL__ < 808 -import Data.Semigroup ((<>)) +import Data.Semigroup (Semigroup, (<>)) #endif import GHC.Word (Word8) import Test.Hspec.QuickCheck @@ -88,9 +88,9 @@ import Test.Hspec import Test.QuickCheck (Property, choose, forAll, withMaxSuccess) import Test.QuickCheck.Monadic (assert, monadicIO, run) -import Streamly -import Streamly.Prelude ((.:), nil) -import Streamly as S +import Streamly.Prelude + ( SerialT, IsStream, (.:), nil, (|&), serially, avgRate, rate, maxBuffer + , maxThreads, maxBuffer) import qualified Streamly.Prelude as S import qualified Streamly.Data.Fold as FL import qualified Streamly.Internal.Data.Fold as FL @@ -502,7 +502,7 @@ monoidOps desc z eq t = do prop (desc <> " Compose empty at the end") $ spec (singleton 1 <> z) [1] prop (desc <> " Compose two") $ spec (singleton 0 <> singleton 1) [0, 1] prop (desc <> " Compose many") $ - spec (S.forEachWith (<>) [1 .. 100] singleton) [1 .. 100] + spec (S.concatForFoldableWith (<>) [1 .. 100] singleton) [1 .. 100] -- These are not covered by the property tests prop (desc <> " Compose three - empty in the middle") $ @@ -553,8 +553,8 @@ semigroupOps -> (t IO Int -> SerialT IO Int) -> Spec semigroupOps desc eq t = do - prop (desc <> " <>") $ foldFromList (S.foldMapWith (<>) singleton) t eq - prop (desc <> " mappend") $ foldFromList (S.foldMapWith mappend singleton) t eq + prop (desc <> " <>") $ foldFromList (S.concatMapFoldableWith (<>) singleton) t eq + prop (desc <> " mappend") $ foldFromList (S.concatMapFoldableWith mappend singleton) t eq ------------------------------------------------------------------------------- -- Transformation operations @@ -850,7 +850,7 @@ folded = (\xs -> case xs of [x] -> return x -- singleton stream case - _ -> S.foldMapWith (<>) return xs) + _ -> S.concatMapFoldableWith (<>) return xs) makeCommonOps :: IsStream t => (t m a -> c) -> [(String, t m a -> c)] makeCommonOps t = diff --git a/test/Streamly/Test/Prelude/Ahead.hs b/test/Streamly/Test/Prelude/Ahead.hs index 5d60c5f895..2c7a8d4e50 100644 --- a/test/Streamly/Test/Prelude/Ahead.hs +++ b/test/Streamly/Test/Prelude/Ahead.hs @@ -17,7 +17,7 @@ import Test.Hspec.QuickCheck import Test.QuickCheck.Monadic (monadicIO, run) import Test.Hspec as H -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S import Streamly.Test.Common diff --git a/test/Streamly/Test/Prelude/Async.hs b/test/Streamly/Test/Prelude/Async.hs index 96f95ca378..87eec8e76f 100644 --- a/test/Streamly/Test/Prelude/Async.hs +++ b/test/Streamly/Test/Prelude/Async.hs @@ -15,7 +15,7 @@ import Data.Semigroup ((<>)) import Test.Hspec.QuickCheck import Test.Hspec as H -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S import Streamly.Test.Prelude diff --git a/test/Streamly/Test/Prelude/Concurrent.hs b/test/Streamly/Test/Prelude/Concurrent.hs index 8820cda8c3..b9b85a3648 100644 --- a/test/Streamly/Test/Prelude/Concurrent.hs +++ b/test/Streamly/Test/Prelude/Concurrent.hs @@ -27,7 +27,7 @@ import Test.QuickCheck (Property, withMaxSuccess) import Test.QuickCheck.Monadic (monadicIO, run) -import Streamly +import Streamly.Prelude hiding (replicateM, reverse) import qualified Streamly.Prelude as S import Streamly.Test.Common diff --git a/test/Streamly/Test/Prelude/Parallel.hs b/test/Streamly/Test/Prelude/Parallel.hs index 8ef8c533d9..1c78868f0a 100644 --- a/test/Streamly/Test/Prelude/Parallel.hs +++ b/test/Streamly/Test/Prelude/Parallel.hs @@ -15,7 +15,7 @@ import Data.Semigroup ((<>)) import Test.Hspec.QuickCheck import Test.Hspec as H -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S import Streamly.Test.Prelude diff --git a/test/Streamly/Test/Prelude/Serial.hs b/test/Streamly/Test/Prelude/Serial.hs index d6b9b1ddbe..326dcf5ab4 100644 --- a/test/Streamly/Test/Prelude/Serial.hs +++ b/test/Streamly/Test/Prelude/Serial.hs @@ -29,7 +29,8 @@ import Test.QuickCheck import Test.QuickCheck.Monadic (assert, monadicIO, run) import Test.Hspec as H -import Streamly +import Streamly.Prelude + ( SerialT, IsStream, avgRate, maxBuffer, serial, serially) import qualified Streamly.Prelude as S import qualified Streamly.Data.Fold as FL diff --git a/test/Streamly/Test/Prelude/WAsync.hs b/test/Streamly/Test/Prelude/WAsync.hs index 4618e26460..602649ee98 100644 --- a/test/Streamly/Test/Prelude/WAsync.hs +++ b/test/Streamly/Test/Prelude/WAsync.hs @@ -15,7 +15,7 @@ import Data.Semigroup ((<>)) import Test.Hspec.QuickCheck import Test.Hspec as H -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S import Streamly.Test.Prelude diff --git a/test/Streamly/Test/Prelude/WSerial.hs b/test/Streamly/Test/Prelude/WSerial.hs index 55ea44a0f2..74748d1dff 100644 --- a/test/Streamly/Test/Prelude/WSerial.hs +++ b/test/Streamly/Test/Prelude/WSerial.hs @@ -17,7 +17,7 @@ import Test.Hspec.QuickCheck import Test.QuickCheck.Monadic (monadicIO, run) import Test.Hspec as H -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S import Streamly.Test.Common diff --git a/test/Streamly/Test/Prelude/ZipAsync.hs b/test/Streamly/Test/Prelude/ZipAsync.hs index cff42fb07e..44cad00da7 100644 --- a/test/Streamly/Test/Prelude/ZipAsync.hs +++ b/test/Streamly/Test/Prelude/ZipAsync.hs @@ -12,7 +12,7 @@ module Streamly.Test.Prelude.ZipAsync where import Test.Hspec.QuickCheck import Test.Hspec as H -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S import Streamly.Test.Prelude diff --git a/test/Streamly/Test/Prelude/ZipSerial.hs b/test/Streamly/Test/Prelude/ZipSerial.hs index 523bb27eef..d0fe14445d 100644 --- a/test/Streamly/Test/Prelude/ZipSerial.hs +++ b/test/Streamly/Test/Prelude/ZipSerial.hs @@ -15,7 +15,7 @@ import Data.Semigroup ((<>)) import Test.Hspec.QuickCheck import Test.Hspec as H -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S import Streamly.Test.Prelude diff --git a/test/loops.hs b/test/loops.hs index e36cf17b42..aa8f021eaf 100644 --- a/test/loops.hs +++ b/test/loops.hs @@ -1,6 +1,8 @@ -import Streamly +#if !(MIN_VERSION_base(4,11,0)) +import Data.Semigroup ((<>)) +#endif import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) -import Streamly.Prelude (nil, yieldM, drain) +import Streamly.Prelude main :: IO () main = do diff --git a/test/nested-loops.hs b/test/nested-loops.hs index 8f33723de7..c31a8693eb 100644 --- a/test/nested-loops.hs +++ b/test/nested-loops.hs @@ -1,8 +1,10 @@ +#if !(MIN_VERSION_base(4,11,0)) +import Data.Semigroup ((<>)) +#endif import Control.Concurrent (myThreadId) import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) import System.Random (randomIO) -import Streamly -import Streamly.Prelude (drain, nil, yieldM) +import Streamly.Prelude main :: IO () main = drain $ do diff --git a/test/parallel-loops.hs b/test/parallel-loops.hs index cc36a6a6f5..8bf2c32fda 100644 --- a/test/parallel-loops.hs +++ b/test/parallel-loops.hs @@ -1,7 +1,7 @@ import Control.Concurrent (myThreadId, threadDelay) import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) import System.Random (randomIO) -import Streamly +import Streamly.Prelude import qualified Streamly.Prelude as S main :: IO ()