Streamly streams are a generalization of Haskell lists with a similar API with two crucial additions:
- A list is a pure sequence of values whereas a stream is a monadic sequence of values i.e. a sequence of values generated by monadic actions.
- The monadic actions producing the sequence can be executed concurrently. Concurrency is declarative and can be switched on or off using an appropriate combinator.
A list is a sequence of pure values or pure steps that produce a value on evaluation. Ideally, we want to process each element in the sequence through a pipeline of processing stages without first accumulating all the elements in the sequence. This style of processing is known as "streaming" and it runs in constant memory because we do not accumulate elements in memory, data gets consumed as soon as it is produced. If all the processing stages are pure, lists can be used in a streaming style processing. For example, this is streaming style processing:
replicate 10 1
& zipWith (+) [1..10]
& map (+1)
& filter odd
& sum
However, if a list is generated in a strict monad (e.g. IO) it cannot be
consumed in a streaming fashion as described above. In that case the full list
first gets accumulated and then processed. This means we will consume memory
proportional to the size of the list and therefore it becomes unusable for
large lists. For example, the following code accumulates xs
before it
processes it:
xs <- replicateM 10 (return 1)
zipWith (+) [1..10] xs
& map (+1)
& filter odd
& sum
Monadic streams solve this problem. To be able to consume elements from
replicateM
as it produces them we need a monadic representation of the stream
and stream processing operations that can consume the stream elements lazily
one at a time. Streamly provides a monadic representation for lists with the
same API. So we can represent the previous example in a streaming fashion by
replacing the list combinators with corresponding streamly combinators:
S.replicateM 10 (return 1)
& S.zipWith (+) (S.fromList [1..10])
& S.map (+1)
& S.filter odd
& S.sum
Streamly's monadic streams are a generalization of pure streams (aka lists) and therefore can represent pure streams just as easily, making lists a special case.
SerialT Identity a
can be used in place of [a]
with equivalent or better
performance and almost identical interface except a few differences. Use of
OverloadedLists
extension can make the difference even less significant.
Lists are constructed using []
and :
.
> "hello" : "world" : []
["hello","world"]
Pure streams are constructed using S.nil
(corresponds to []
) and S.cons
or .:
(corresponds to :
):
import Streamly
import Streamly.Prelude ((.:))
import qualified Streamly.Prelude as S
> "hello" .: "world" .: S.nil :: SerialT Identity String
fromList ["hello","world"]
The crucial difference is that lists are built using constructors whereas
streams are built using functions. S.nil
and S.cons
are functions.
Therefore, you cannot directly pattern match on streams. However, the yet
unexposed Streamly.List
module also provides Nil
and Cons
pattern
synonyms corresponding to the list []
and :
constructors for pattern
matches.
SerialT Identity a
is an instance of Show
, Read
, Eq
, Ord
, IsList
,
Foldable
and Traversable
type classes. Furthermore, SerialT Identity Char
is an instance of IsString
. Use of OverloadedLists
and OverloadedStrings
can make the use of streams in place of lists quite convenient.
Along with these we can use combinators from Streamly.Prelude
to perform all
list operations on pure streams.
Lists:
> replicate 10 1
> map (+1) $ replicate 10 1
> length $ replicate 10 1
Pure streams are almost identical:
> S.replicate 10 1 :: SerialT Identity Int
> S.map (+1) $ S.replicate 10 1 :: SerialT Identity Int
> length (S.replicate 10 1 :: SerialT Identity Int)
Lists have great performance as long as we perform only pure operations on them. When monadic operations are performed on lists, they may accumulate potentially unbounded or large lists in memory degrading performance massively.
Such "unsafe" list operations exist in Control.Monad
, Data.Traversable
, and
Data.Foldable
in base
. They are typically found with an M
suffix.
Streamly provides monadic streams and the equivalents of the above mentioned monadic list operations work on streams without any issues.
Unsafe monadic operations involving lists, run these example with +RTS -s
options:
import Control.Monad
main = do
xs <- replicateM 10000000 (pure 1)
ys <- mapM (\x -> return $ x + 1) xs
print $ length ys
Streams:
import Streamly
import qualified Streamly.Prelude as S
main = do
n <- S.length
$ S.mapM (\x -> return $ x + 1)
$ S.replicateM 10000000 (pure 1)
print n
Another unsafe category of operations for lists is stream splitting operations
e.g. lines
, splitAt
and group
. Such operations may also accumulate
unbounded data in memory.
Stream splitting operations in streamly are streaming in nature and can work without accumulating any data in memory.
Monadic streams are nothing but lists made monadic. We can construct streams using monadic actions, just the way pure lists are constructed:
> import Streamly.Prelude ((|:))
> import qualified Streamly.Prelude as S
> S.toList $ getLine |: getLine |: S.nil
hello
world
["hello","world"]
|:
is the monadic construction equivalent of :
.
The monadic action getLine
is executed twice in a sequence, the first one
gets "hello" and second one gets "world" from the terminal and a stream
consisting of those strings is produced.
Streamly streams are not just monadic lists but they are concurrent monadic lists. All the monadic streaming operations provided by streamly are inherently concurrent. Concurrent behavior can be enabled by just using an appropriate modifying combinator, without any other change to the code.
The following code prints one element every second:
import Control.Concurrent
import Streamly
import qualified Streamly.Prelude as S
func = S.mapM (\x -> threadDelay 1000000 >> print x) $ S.replicate 10 1
main = runStream func
To run it concurrently, just run the same code with asyncly
combinator:
main = runStream $ asyncly func
The mapM
combinator now maps the monadic delay action concurrently on all the
stream elements. It prints all the 10 elements after one second because all
the delay actions run concurrently. Alternatively we can write:
func = S.mapM $ asyncly $ S.replicateM (threadDelay 1000000 >> print 1)
main = runStream func
Here, the replicateM
operation replicates the action concurrently. In real
applications this could be a request to a web server that you may want to
perform multiple times concurrently.
The following figures show the ratio of time and memory consumed by [Int]
(base-4.12
) vs SerialT Identity Int
(streamly@00c7613) for exactly the
same operation. 5x
on the y axis means lists take 5 times more resources
compared to streamly. Whereas a 1/5x
means that lists take 5 times less
resources compared to streamly. We only show those operations that are at least
10% better or worse in one library compared to the other. The operations are
shown in a sorted order, from list's worst performing ones on the left to its
best ones on the right.
By using monadic streams instead of lists you get the following benefits:
- Use just one library for pure or monadic lists. There is no difference in pure or monadic code, its all the same.
- You can use a monadic action anywhere in the code e.g. printing a warning or error message or any other effectful operations. This is not possible in pure list code.
- You can make the code concurrent whenever you want without having to modify it.
- You do not have the risk of using unsafe/accumulating list operations degrading performance massively and resulting in space leaks.
- Streamly provides safe versions of partial list functions like
head
,tail
etc. - You get better fusion and therefore better performance for many operations,
the only operation which is significantly faster for lists is
concatMap
.