Skip to content

Commit

Permalink
Deprecate Streamly module and move the exports to Streamly.Prelude.
Browse files Browse the repository at this point in the history
- Deprecate and replace functions:
    foldWith => concatFoldableWith
    foldMapWith => concatMapFoldableWith
    forEachWith => concatForFoldableWith
  • Loading branch information
pranaysashank committed Sep 9, 2020
1 parent e218fb9 commit ad05792
Show file tree
Hide file tree
Showing 73 changed files with 819 additions and 385 deletions.
9 changes: 9 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@
fully drained. Also, the resource acquisition and release is atomic with
respect to async exceptions.

### Deprecations

* The `Streamly` module is now deprecated, its functionality is subsumed
by `Streamly.Prelude`.
* Some functions from `Streamly` module have been renamed in `Streamly.Prelude` module:
* `foldWith` has been replaced by `concatFoldableWith`
* `foldMapWith` has been replaced by `concatMapFoldableWith`
* `forEachWith` has been replaced by `concatForFoldableWith`

## 0.7.2

### Bug Fixes
Expand Down
34 changes: 14 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ Please see [INSTALL.md](./INSTALL.md) for instructions on how to use streamly
with your Haskell build tool or package manager. You may want to go through it
before jumping to run the examples below.

The module `Streamly` provides just the core stream types, type casting and
concurrency control combinators. Stream construction, transformation, folding,
merging, zipping combinators are found in `Streamly.Prelude`.
The module `Streamly.Prelude` provides the core stream types and combinators
for type casting, controlling concurrency, stream construction, transformation,
folding, merging, and zipping.

## Streaming Pipelines

Expand All @@ -117,7 +117,6 @@ numbers from stdin, prints the squares of even numbers and exits if an even
number more than 9 is entered.

``` haskell
import Streamly
import qualified Streamly.Prelude as S
import Data.Function ((&))

Expand Down Expand Up @@ -150,15 +149,15 @@ each action is finished (asyncly):

``` haskell
> let p n = threadDelay (n * 1000000) >> return n
> S.toList $ asyncly $ p 3 |: p 2 |: p 1 |: S.nil
> S.toList $ S.asyncly $ p 3 |: p 2 |: p 1 |: S.nil
[1,2,3]
```

Use `aheadly` if you want speculative concurrency i.e. execute the actions in
the stream concurrently but consume the results in the specified order:

``` haskell
> S.toList $ aheadly $ p 3 |: p 2 |: p 1 |: S.nil
> S.toList $ S.aheadly $ p 3 |: p 2 |: p 1 |: S.nil
[3,2,1]
```

Expand All @@ -168,7 +167,7 @@ Monadic stream generation functions e.g. `unfoldrM`, `replicateM`, `repeatM`,
The following finishes in 10 seconds (100 seconds when serial):

``` haskell
S.drain $ asyncly $ S.replicateM 10 $ p 10
S.drain $ S.asyncly $ S.replicateM 10 $ p 10
```

## Concurrency Auto Scaling
Expand All @@ -177,7 +176,7 @@ Concurrency is auto-scaled i.e. more actions are executed concurrently if the
consumer is consuming the stream at a higher speed. How many tasks are executed
concurrently can be controlled by `maxThreads` and how many results are
buffered ahead of consumption can be controlled by `maxBuffer`. See the
documentation in the `Streamly` module.
documentation in the `Streamly.Prelude` module.

## Concurrent Streaming Pipelines

Expand All @@ -198,7 +197,7 @@ We can use `mapM` or `sequence` functions concurrently on a stream.

``` haskell
> let p n = threadDelay (n * 1000000) >> return n
> S.drain $ aheadly $ S.mapM (\x -> p 1 >> print x) (serially $ repeatM (p 1))
> S.drain $ S.aheadly $ S.mapM (\x -> p 1 >> print x) (S.serially $ S.repeatM (p 1))
```

## Serial and Concurrent Merging
Expand All @@ -209,11 +208,10 @@ stream, each with a delay of 1 to 10 seconds, respectively. Since all the
actions are concurrent we see one output printed every second:

``` haskell
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent (threadDelay)

main = S.toList $ parallely $ foldMap delay [1..10]
main = S.toList $ S.parallely $ foldMap delay [1..10]
where delay n = S.yieldM $ threadDelay (n * 1000000) >> print n
```

Expand All @@ -222,7 +220,6 @@ below, see the tutorial for more ways. We use the following `delay`
function in the examples to demonstrate the concurrency aspects:

``` haskell
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

Expand All @@ -245,7 +242,7 @@ ThreadId 36: Delay 1
### Parallel

``` haskell
main = S.drain . parallely $ delay 3 <> delay 2 <> delay 1
main = S.drain . S.parallely $ delay 3 <> delay 2 <> delay 1
```
```
ThreadId 42: Delay 1
Expand All @@ -258,7 +255,6 @@ ThreadId 40: Delay 3
The monad instance composes like a list monad.

``` haskell
import Streamly
import qualified Streamly.Prelude as S

loops = do
Expand All @@ -282,7 +278,7 @@ loop can run concurrently but the results are presented to the consumer of the
output in the same order as serial execution:

``` haskell
main = S.drain $ aheadly $ loops
main = S.drain $ S.aheadly $ loops
```

Different stream types execute the loop iterations in different ways. For
Expand All @@ -298,11 +294,10 @@ to concurrently generate squares of a stream of numbers and then concurrently
sum the square roots of all combinations of two streams:

``` haskell
import Streamly
import qualified Streamly.Prelude as S

main = do
s <- S.sum $ asyncly $ do
s <- S.sum $ S.asyncly $ do
-- Each square is performed concurrently, (<>) is concurrent
x2 <- foldMap (\x -> return $ x * x) [1..100]
y2 <- foldMap (\y -> return $ y * y) [1..100]
Expand All @@ -324,7 +319,7 @@ directories concurrently:
```haskell
import Control.Monad.IO.Class (liftIO)
import Path.IO (listDir, getCurrentDir) -- from path-io package
import Streamly (AsyncT, adapt)
import Streamly.Prelude (AsyncT, adapt)
import qualified Streamly.Prelude as S

listDirRecursive :: AsyncT IO ()
Expand All @@ -350,10 +345,9 @@ For bounded concurrent streams, stream yield rate can be specified. For
example, to print hello once every second you can simply write this:

``` haskell
import Streamly
import Streamly.Prelude as S

main = S.drain $ asyncly $ avgRate 1 $ S.repeatM $ putStrLn "hello"
main = S.drain $ S.asyncly $ S.avgRate 1 $ S.repeatM $ putStrLn "hello"
```

For some practical uses of rate control, see
Expand Down
2 changes: 1 addition & 1 deletion benchmark/NanoBenchmarks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

import Streamly (SerialT)
import Streamly.Prelude (SerialT)
import Streamly.Internal.Data.SVar (MonadAsync)

import qualified Streamly.Data.Array.Storable.Foreign as A
Expand Down
1 change: 0 additions & 1 deletion benchmark/Streamly/Benchmark/Data/ArrayOps.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import Control.Monad.IO.Class (MonadIO)
import Prelude (Bool, Int, Maybe(..), ($), (+), (.), (==), (>), undefined)

import qualified Prelude as P
import qualified Streamly as S hiding (foldMapWith, runStream)
import qualified Streamly.Prelude as S

#ifndef DATA_PRIM_ARRAY
Expand Down
4 changes: 1 addition & 3 deletions benchmark/Streamly/Benchmark/Data/Fold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import Prelude (IO, Int, Double, String, (>), (<*>), (<$>), (+), ($),
(<=), Monad(..), (==), Maybe(..), (.), fromIntegral,
compare, (>=), concat, seq)

import qualified Streamly as S hiding (runStream)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Pipe as Pipe
Expand All @@ -30,7 +29,6 @@ import qualified Streamly.Internal.Data.Fold as IFL
import qualified Streamly.Internal.Data.Stream.IsStream as IP

import Gauge
import Streamly hiding (runStream)
import Streamly.Benchmark.Common

-- We need a monadic bind here to make sure that the function f does not get
Expand All @@ -52,7 +50,7 @@ source = sourceUnfoldrM
-- | Takes a fold method, and uses it with a default source.
{-# INLINE benchIOSink #-}
benchIOSink
:: (IsStream t, NFData b)
:: (S.IsStream t, NFData b)
=> Int -> String -> (t IO Int -> IO b) -> Benchmark
benchIOSink value name f = bench name $ nfIO $ randomRIO (1,1) >>= f . source value

Expand Down
5 changes: 2 additions & 3 deletions benchmark/Streamly/Benchmark/Data/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import Prelude
import qualified Data.Traversable as TR
import qualified Data.Foldable as F
import qualified Control.Applicative as AP
import qualified Streamly as S hiding (runStream)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Parser as PR
import qualified Streamly.Internal.Data.Stream.IsStream as IP

import Gauge
import Streamly hiding (runStream)
import Streamly.Prelude (SerialT)
import Streamly.Benchmark.Common

-------------------------------------------------------------------------------
Expand All @@ -52,7 +51,7 @@ sourceUnfoldrM value n = S.unfoldrM step n
-- | Takes a fold method, and uses it with a default source.
{-# INLINE benchIOSink #-}
benchIOSink
:: (IsStream t, NFData b)
:: (S.IsStream t, NFData b)
=> Int -> String -> (t IO Int -> IO b) -> Benchmark
benchIOSink value name f =
bench name $ nfIO $ randomRIO (1,1) >>= f . sourceUnfoldrM value
Expand Down
5 changes: 2 additions & 3 deletions benchmark/Streamly/Benchmark/Data/Parser/ParserD.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import Prelude hiding (any, all, take, sequence, sequenceA, takeWhile)
import qualified Data.Traversable as TR
import qualified Data.Foldable as F
import qualified Control.Applicative as AP
import qualified Streamly as S hiding (runStream)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Parser.ParserD as PR
import qualified Streamly.Internal.Data.Stream.IsStream as IP

import Gauge
import Streamly hiding (runStream)
import Streamly.Prelude (SerialT, MonadAsync, IsStream)
import Streamly.Benchmark.Common

-------------------------------------------------------------------------------
Expand All @@ -39,7 +38,7 @@ import Streamly.Benchmark.Common
-- XXX these can be moved to the common module

{-# INLINE sourceUnfoldrM #-}
sourceUnfoldrM :: (S.IsStream t, S.MonadAsync m) => Int -> Int -> t m Int
sourceUnfoldrM :: (IsStream t, MonadAsync m) => Int -> Int -> t m Int
sourceUnfoldrM value n = S.unfoldrM step n
where
step cnt =
Expand Down
5 changes: 2 additions & 3 deletions benchmark/Streamly/Benchmark/Data/Parser/ParserK.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ import Prelude hiding (any, all, take, sequence, sequenceA, takeWhile)
import qualified Control.Applicative as AP
import qualified Data.Foldable as F
import qualified Data.Traversable as TR
import qualified Streamly as S hiding (runStream)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Parser.ParserK.Types as PR
import qualified Streamly.Internal.Data.Parser.ParserD as PRD
import qualified Streamly.Internal.Data.Stream.IsStream as IP

import Gauge
import Streamly hiding (runStream)
import Streamly.Prelude (SerialT, MonadAsync, IsStream)
import Streamly.Benchmark.Common

-------------------------------------------------------------------------------
Expand All @@ -41,7 +40,7 @@ import Streamly.Benchmark.Common
-- XXX these can be moved to the common module

{-# INLINE sourceUnfoldrM #-}
sourceUnfoldrM :: (S.IsStream t, S.MonadAsync m) => Int -> Int -> t m Int
sourceUnfoldrM :: (IsStream t, MonadAsync m) => Int -> Int -> t m Int
sourceUnfoldrM value n = S.unfoldrM step n
where
step cnt =
Expand Down
10 changes: 5 additions & 5 deletions benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ sourceFromFoldableM n =

{-# INLINE sourceFoldMapWith #-}
sourceFoldMapWith :: Int -> Stream m Int
sourceFoldMapWith n = SP.foldMapWith S.serial S.yield [n..n+value]
sourceFoldMapWith n = SP.concatMapFoldableWith S.serial S.yield [n..n+value]

{-# INLINE sourceFoldMapWithM #-}
sourceFoldMapWithM :: Monad m => Int -> Stream m Int
sourceFoldMapWithM n = SP.foldMapWith S.serial (S.yieldM . return) [n..n+value]
sourceFoldMapWithM n = SP.concatMapFoldableWith S.serial (S.yieldM . return) [n..n+value]

-------------------------------------------------------------------------------
-- Elimination
Expand Down Expand Up @@ -484,8 +484,8 @@ o_1_space =
, benchFold "fromFoldableM" toNull sourceFromFoldableM

-- appends
, benchFold "foldMapWith" toNull sourceFoldMapWith
, benchFold "foldMapWithM" toNull sourceFoldMapWithM
, benchFold "concatMapFoldableWith" toNull sourceFoldMapWith
, benchFold "concatMapFoldableWithM" toNull sourceFoldMapWithM
]
, bgroup "elimination"
[ benchFold "toNull" toNull sourceUnfoldrM
Expand Down Expand Up @@ -542,7 +542,7 @@ o_1_space =
, benchIOSrc1 "concatMapRepl (sqrt n of sqrt n)"
(concatMapRepl value2 value2)

-- This is for comparison with foldMapWith
-- This is for comparison with concatMapFoldableWith
, benchIOSrc1 "concatMapWithId (n of 1) (fromFoldable)"
(S.drain . S.concatMapBy S.serial id . sourceConcatMapId value)

Expand Down
1 change: 0 additions & 1 deletion benchmark/Streamly/Benchmark/Prelude/Adaptive.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import Control.Concurrent (threadDelay)
import Control.Monad (when)
import Control.Monad.IO.Class (liftIO)
import Gauge
import Streamly
import Streamly.Prelude as S
import System.Random (randomRIO)

Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Prelude/Ahead.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Prelude hiding (mapM)

import Streamly (aheadly, serially, ahead, maxBuffer, maxThreads)
import Streamly.Prelude (aheadly, serially, ahead, maxBuffer, maxThreads)
import qualified Streamly.Prelude as S

import Streamly.Benchmark.Common
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Prelude/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Prelude hiding (mapM)

import Streamly (asyncly, async, maxBuffer, maxThreads, serially)
import Streamly.Prelude (asyncly, async, maxBuffer, maxThreads, serially)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.StreamK.Type as Internal

Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Prelude/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Control.Concurrent
import Control.Monad (when, replicateM)

import Gauge
import Streamly
import Streamly.Prelude hiding (mapM_, replicateM)
import qualified Streamly.Prelude as S

-------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions benchmark/Streamly/Benchmark/Prelude/Parallel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

import Prelude hiding (mapM)

import Streamly (SerialT, parallely, parallel, serially, maxBuffer, maxThreads)
import Streamly.Prelude
( SerialT, parallely, parallel, serially, maxBuffer, maxThreads)

import qualified Streamly as S
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Parallel as Par
Expand Down
5 changes: 2 additions & 3 deletions benchmark/Streamly/Benchmark/Prelude/Rate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
-- License : BSD3
-- Maintainer : [email protected]

import Streamly (asyncly, aheadly, maxThreads)

import qualified Streamly as S
import Streamly.Prelude (asyncly, aheadly, maxThreads)
import qualified Streamly.Prelude as S

import Streamly.Benchmark.Common
import Streamly.Benchmark.Prelude
Expand Down
6 changes: 4 additions & 2 deletions benchmark/Streamly/Benchmark/Prelude/Serial.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif

module Main where

import Control.DeepSeq (NFData(..))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
Expand All @@ -48,14 +50,14 @@ import Test.Inspection
import qualified Streamly.Internal.Data.Stream.StreamD as D
#endif

import qualified Streamly as S

import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Unfold as UF

import Gauge
import Streamly
import Streamly.Prelude (SerialT, IsStream, serially, serial)
import Streamly.Benchmark.Common
import Streamly.Benchmark.Prelude
import Streamly.Internal.Data.Time.Units
Expand Down
Loading

0 comments on commit ad05792

Please sign in to comment.