Streamly is a library to make concurrent programming a joy. The venerable
async
package is the go to package for concurrent programming for most
Haskellers. Streamly is a higher level library than async
and provides a lot
more power and functionality, using a simpler and concise expression of
concurrency. At a high level, you should be able to express everything with
streamly that you can with async
, if you can't please raise an issue. If you
are familiar with async
, in this document we highlight how streamly can be
used where you would use async
.
Unlike async
, streamly does not use a spawn and wait
model. Streamly uses
a more high level approach to concurrency and has no explicit notion of
threads. In streamly, we compose multiple actions as a stream and then express
whether you want to run the actions in the stream serially
or parallely
.
There are many different ways in which you can run streams concurrently, see
the reference documentation for details.
Since there is no explicit notion of threads in streamly, there are no
equivalents of async
, wait
, cancel
, poll
or link
combinators from the
async
package.
Since streamly is a monad transformer it can work with all monads and not just
IO, you won't need adaptations like lifted-async
to use it for a generic
monad.
You can write all of your program in a streamly monad and use the full power of
the library. Streamly can be used as a direct replacement of the IO monad with
no loss of performance, and no change in code except using liftIO
or fromEffect
to run any IO actions. Streamly IO monads (e.g. SerialT IO
) are just a
generalization of the IO monad with non-deterministic composition of streams
added on top.
However, if you would like to just run only some concurrent portions of your
program using streamly, you can do that too. Just use drain
if you want
to run the stream without collecting the outputs of the concurrent actions or
use toList
if you want to convert the output stream into a list. Other
stream folding operations can also be used, see the docs for more details.
Use the following imports to run the snippets shown below:
import Streamly
import Streamly.Prelude ((|:))
import qualified Streamly.Prelude as S
import qualified Data.Text as Text
import Control.Concurrent (threadDelay)
Let us simulate a URL fetch with a delay of n
seconds using the following
functions:
getURL :: Int -> IO String
getURL n = threadDelay (n * 1000000) >> return (show n)
getURLString = getURL
getURLText n = getURL n >>= return . Text.pack
You can run any number of actions concurrently. For example, to fetch two URLs concurrently:
urls <- S.toList $ fromParallel $ getURL 2 |: getURL 1 |: S.nil
This would return the results in their arrival order i.e. first 1 and then 2.
If you want to preserve the order of the results, use the lookahead style
using fromAhead
instead. In the following example both URLs are fetched
concurrently, and even though URL 1 arrives before URL 2 the results will
return 2 first and then 1.
urls <- S.toList $ fromAhead $ getURL 2 |: getURL 1 |: S.nil
Use drain
instead of toList
to run the actions but ignore the results:
S.drain $ fromParallel $ getURL 1 |: getURL 2 |: S.nil
If the actions that you are executing result in different output types you can use applicative zip to collect the results or to directly apply them to a function:
tuples <- S.toList $ fromZipAsync $
(,) <$> S.fromEffect (getURLString 1) <*> S.fromEffect (getURLText 2)
There are two ways to achieve the race functionality, using take
or using
exceptions.
We can run multiple actions concurrently and take the first result that arrives:
urls <- S.toList $ S.take 1 $ fromParallel $ getURL 1 |: getURL 2 |: S.nil
After the first result arrives, the rest of the actions are canceled
automatically. In general, we can take first n
results as they arrive:
urls <- S.toList $ S.take 2 $ fromParallel $ getURL 1 |: getURL 2 |: S.nil
When an exception occurs in a concurrent stream all the concurrently running actions are cacnceled on arrival of the exception. This can be used to implement the race functionality. Each action in the stream can use an exception to communicate the result. As soon as the first result arrives all other actions will be canceled, for example:
data Result = Result String deriving Show
instance Exception Result
main = do
url <- try $ S.drain $ fromParallel $
(getURL 2 >>= throwM . Result)
|: (getURL 1 >>= throwM . Result)
|: S.nil
case url of
Left (e :: SomeException) -> print e
Right _ -> undefined
There are many ways to map concurrently on a container and collect the results:
You can create a concurrent stream from a Foldable
container of monadic
actions:
urls <- S.toList $ fromAhead $ S.fromFoldableM $ fmap getURL [1..3]
You can first convert a Foldable
into a stream and then map an action on the
stream concurrently:
urls <- S.toList $ fromAhead $ S.mapM getURL $ foldMap return [1..3]
You can map a monadic action to a Foldable
container to convert it into a
stream and at the same time fold it:
urls <- S.toList $ fromAhead $ foldMap (S.fromEffect . getURL) [1..3]
Streamly has not just the equivalent of replicateConcurrently
which is
replicateM
but many more ways to generate concurrent streams, for example,
|:
, unfoldrM
, repeatM
, iterateM
, fromFoldableM
etc. See the
Streamly.Prelude
module documentation for more details.
xs <- S.toList $ fromParallel $ S.replicateM 2 $ getURL 1
The stream resulting from concurrent actions can be mapped serially or concurrently.
To map serially just use fmap
:
xs <- S.toList $ fromParallel $ fmap (+1) $ return 1 |: return 2 |: S.nil
To map a monadic action concurrently on all elements of the stream use mapM
:
xs <- S.toList $ fromParallel $ S.mapM (\x -> return (x + 1))
$ return 1 |: return 2 |: S.nil
The Semigroup
instances of streamly merge multiple streams serially or
concurrently.
The Monad
instances of streamly nest loops concurrently (concurrent
non-determinism).
Streamly has very little concurrency overhead (ranging from a few 100
nanoseconds to a few microseconds on a 2.2 GHz Intel Core i7), you can even run
very lightweight actions in parallel without worrying about the overhead of
concurrency. See the performance benchmarks comparing streamly with the async
package in this repo.
There is much more that you can do with streamly. For example, you can use the
maxThreads
combinator to restrict the total number of concurrent threads or
use the maxBuffer
combinator to restrict the total number of bufferred
results or you can use the avgRate
combinator to control the rate at which
the concurrent actions are executed.
See the haddock documentation on hackage and a comprehensive tutorial here.