Skip to content

Commit

Permalink
Make the fetch function return a Maybe type in demux-like functions
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyaov committed Jan 2, 2025
1 parent 37f9565 commit bbf4ff5
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 55 deletions.
44 changes: 25 additions & 19 deletions benchmark/Streamly/Benchmark/Data/Fold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -fno-warn-warnings-deprecations #-}
{-# OPTIONS_GHC -Wno-orphans #-}

#undef FUSION_CHECK
Expand Down Expand Up @@ -48,6 +47,7 @@ import Streamly.Internal.Data.MutArray (MutArray)

import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Scanl as Scanl
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser as Parser
import qualified Streamly.Internal.Data.Pipe as Pipe
Expand Down Expand Up @@ -130,14 +130,14 @@ filter _ = Stream.fold (FL.filter even FL.drain)

{-# INLINE scanMaybe #-}
scanMaybe :: Monad m => Int -> Stream m Int -> m ()
scanMaybe _ = Stream.fold (FL.scanMaybe (FL.filtering even) FL.drain)
scanMaybe _ = Stream.fold (FL.postscanlMaybe (Scanl.filtering even) FL.drain)

{-# INLINE scanMaybe2 #-}
scanMaybe2 :: Monad m => Int -> Stream m Int -> m ()
scanMaybe2 _ =
Stream.fold
$ FL.scanMaybe (FL.filtering even)
$ FL.scanMaybe (FL.filtering odd) FL.drain
$ FL.postscanlMaybe (Scanl.filtering even)
$ FL.postscanlMaybe (Scanl.filtering odd) FL.drain

-------------------------------------------------------------------------------
-- Splitting in two
Expand Down Expand Up @@ -423,18 +423,18 @@ partitionByMinM =

{-# INLINE demuxToMap #-}
demuxToMap :: (Monad m, Ord k) =>
(a -> k) -> (a -> m (Fold m a b)) -> Stream m a -> m (Map k b)
demuxToMap f g = Stream.fold (FL.demuxToContainer f g)
(a -> k) -> (k -> m (Maybe (Fold m a b))) -> Stream m a -> m (Map k b)
demuxToMap f g = Stream.fold (FL.demuxerToContainer f g)

{-# INLINE demuxToIntMap #-}
demuxToIntMap :: Monad m =>
(a -> Int) -> (a -> m (Fold m a b)) -> Stream m a -> m (IntMap b)
demuxToIntMap f g = Stream.fold (FL.demuxToContainer f g)
(a -> Int) -> (Int -> m (Maybe (Fold m a b))) -> Stream m a -> m (IntMap b)
demuxToIntMap f g = Stream.fold (FL.demuxerToContainer f g)

{-# INLINE demuxToMapIO #-}
demuxToMapIO :: (MonadIO m, Ord k) =>
(a -> k) -> (a -> m (Fold m a b)) -> Stream m a -> m (Map k b)
demuxToMapIO f g = Stream.fold (FL.demuxToContainerIO f g)
(a -> k) -> (k -> m (Maybe (Fold m a b))) -> Stream m a -> m (Map k b)
demuxToMapIO f g = Stream.fold (FL.demuxerToContainerIO f g)

{-# INLINE toMap #-}
toMap ::
Expand Down Expand Up @@ -506,9 +506,9 @@ o_1_space_serial_elimination :: Int -> [Benchmark]
o_1_space_serial_elimination value =
[ bgroup "elimination"
[ benchIOSink value "drain" (Stream.fold FL.drain)
, benchIOSink value "drainBy" (Stream.fold (FL.drainBy return))
, benchIOSink value "drainBy" (Stream.fold (FL.drainMapM return))
, benchIOSink value "drainN" (Stream.fold (FL.drainN value))
, benchIOSink value "last" (Stream.fold FL.last)
, benchIOSink value "last" (Stream.fold FL.latest)
, benchIOSink value "length" (Stream.fold FL.length)
, benchIOSink value "top" (Stream.fold $ FL.top 10)
, benchIOSink value "bottom" (Stream.fold $ FL.bottom 10)
Expand All @@ -523,6 +523,11 @@ o_1_space_serial_elimination value =
value
"mean"
(Stream.fold FL.mean . fmap (fromIntegral :: Int -> Double))
{-
-- These are already benchmarked in streamly-statistics package. If we
-- still want to keep these tests here, perhaps we should move them to a
-- different module so we can remove -fno-warn-warnings-deprecations.
, benchIOSink
value
"variance"
Expand All @@ -531,6 +536,7 @@ o_1_space_serial_elimination value =
value
"stdDev"
(Stream.fold FL.stdDev . fmap (fromIntegral :: Int -> Double))
-}
, benchIOSink
value
"mconcat"
Expand Down Expand Up @@ -601,15 +607,15 @@ o_1_space_serial_transformation value =
, benchIOSink
value
"fold-scan"
(Stream.fold $ FL.scan FL.sum FL.drain)
(Stream.fold $ FL.scanl Scanl.sum FL.drain)
, benchIOSink
value
"fold-scanMany"
(Stream.fold $ FL.scanMany (FL.take 2 FL.drain) FL.drain)
(Stream.fold $ FL.scanlMany (Scanl.take 2 Scanl.drain) FL.drain)
, benchIOSink
value
"fold-postscan"
(Stream.fold $ FL.postscan FL.sum FL.drain)
(Stream.fold $ FL.postscanl Scanl.sum FL.drain)
]
]

Expand Down Expand Up @@ -665,11 +671,11 @@ o_n_heap_serial value =
, bgroup "key-value"
[
benchIOSink value "demuxToMap (64 buckets) [sum, length]"
$ demuxToMap (getKey 64) (getFold . getKey 64)
$ demuxToMap (getKey 64) getFold
, benchIOSink value "demuxToIntMap (64 buckets) [sum, length]"
$ demuxToIntMap (getKey 64) (getFold . getKey 64)
$ demuxToIntMap (getKey 64) getFold
, benchIOSink value "demuxToMapIO (64 buckets) [sum, length]"
$ demuxToMapIO (getKey 64) (getFold . getKey 64)
$ demuxToMapIO (getKey 64) getFold

-- classify: immutable
, benchIOSink value "toMap (64 buckets) sum"
Expand All @@ -694,7 +700,7 @@ o_n_heap_serial value =
getKey buckets = (`mod` buckets)

getFold k =
return $ case k of
return $ Just $ case k of
0 -> FL.sum
1 -> FL.length
_ -> FL.length
Expand Down
54 changes: 31 additions & 23 deletions core/src/Streamly/Internal/Data/Fold/Container.hs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ demuxGeneric getKey getFold =
{-# INLINE demuxerToContainer #-}
demuxerToContainer :: (Monad m, IsMap f, Traversable f) =>
(a -> Key f)
-> (Key f -> m (Fold m a b))
-> (Key f -> m (Maybe (Fold m a b)))
-> Fold m a (f b)
demuxerToContainer getKey getFold =
Fold (\s a -> Partial <$> step s a) (Partial <$> initial) undefined final
Expand Down Expand Up @@ -388,8 +388,10 @@ demuxerToContainer getKey getFold =
let k = getKey a
case IsMap.mapLookup k kv of
Nothing -> do
fld <- getFold k
runFold kv kv1 fld (k, a)
mfld <- getFold k
case mfld of
Nothing -> pure $ Tuple' kv kv1
Just fld -> runFold kv kv1 fld (k, a)
Just f -> runFold kv kv1 f (k, a)

final (Tuple' kv kv1) = do
Expand All @@ -408,7 +410,7 @@ demuxerToContainer getKey getFold =
{-# INLINE demuxScanGeneric #-}
demuxScanGeneric :: (Monad m, IsMap f, Traversable f) =>
(a -> Key f)
-> (Key f -> m (Fold m a b))
-> (Key f -> m (Maybe (Fold m a b)))
-> Scanl m a (m (f b), Maybe (Key f, b))
demuxScanGeneric getKey getFold =
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
Expand Down Expand Up @@ -439,8 +441,10 @@ demuxScanGeneric getKey getFold =
let k = getKey a
case IsMap.mapLookup k kv of
Nothing -> do
fld <- getFold k
runFold kv fld (k, a)
mfld <- getFold k
case mfld of
Nothing -> pure $ Tuple' kv Nothing
Just fld -> runFold kv fld (k, a)
Just f -> runFold kv f (k, a)

extract (Tuple' kv x) = return (Prelude.mapM f kv, x)
Expand Down Expand Up @@ -500,7 +504,7 @@ demux = demuxGeneric
{-# INLINE demuxUsingMap #-}
demuxUsingMap :: (Monad m, Ord k) =>
(a -> k)
-> (k -> m (Fold m a b))
-> (k -> m (Maybe (Fold m a b)))
-> Scanl m a (m (Map k b), Maybe (k, b))
demuxUsingMap = demuxScanGeneric

Expand All @@ -512,7 +516,7 @@ demuxUsingMap = demuxScanGeneric
{-# INLINE demuxScan #-}
demuxScan :: (Monad m, Ord k) =>
(a -> k)
-> (k -> m (Fold m a b))
-> (k -> m (Maybe (Fold m a b)))
-> Scanl m a (Maybe (k, b))
demuxScan getKey = fmap snd . demuxUsingMap getKey

Expand Down Expand Up @@ -601,7 +605,7 @@ demuxGenericIO getKey getFold =
{-# INLINE demuxerToContainerIO #-}
demuxerToContainerIO :: (MonadIO m, IsMap f, Traversable f) =>
(a -> Key f)
-> (Key f -> m (Fold m a b))
-> (Key f -> m (Maybe (Fold m a b)))
-> Fold m a (f b)
demuxerToContainerIO getKey getFold =
Fold (\s a -> Partial <$> step s a) (Partial <$> initial) undefined final
Expand Down Expand Up @@ -647,8 +651,10 @@ demuxerToContainerIO getKey getFold =
let k = getKey a
case IsMap.mapLookup k kv of
Nothing -> do
f <- getFold k
initFold kv kv1 f (k, a)
res <- getFold k
case res of
Nothing -> pure $ Tuple' kv kv1
Just f -> initFold kv kv1 f (k, a)
Just ref -> do
f <- liftIO $ readIORef ref
runFold kv kv1 ref f (k, a)
Expand All @@ -675,7 +681,7 @@ demuxerToContainerIO getKey getFold =
{-# INLINE demuxScanGenericIO #-}
demuxScanGenericIO :: (MonadIO m, IsMap f, Traversable f) =>
(a -> Key f)
-> (Key f -> m (Fold m a b))
-> (Key f -> m (Maybe (Fold m a b)))
-> Scanl m a (m (f b), Maybe (Key f, b))
demuxScanGenericIO getKey getFold =
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
Expand Down Expand Up @@ -721,8 +727,10 @@ demuxScanGenericIO getKey getFold =
let k = getKey a
case IsMap.mapLookup k kv of
Nothing -> do
f <- getFold k
initFold kv f (k, a)
res <- getFold k
case res of
Nothing -> pure $ Tuple' kv Nothing
Just f -> initFold kv f (k, a)
Just ref -> do
f <- liftIO $ readIORef ref
runFold kv ref f (k, a)
Expand Down Expand Up @@ -766,7 +774,7 @@ demuxIO = demuxGenericIO
{-# INLINE demuxUsingMapIO #-}
demuxUsingMapIO :: (MonadIO m, Ord k) =>
(a -> k)
-> (k -> m (Fold m a b))
-> (k -> m (Maybe (Fold m a b)))
-> Scanl m a (m (Map k b), Maybe (k, b))
demuxUsingMapIO = demuxScanGenericIO

Expand All @@ -779,7 +787,7 @@ demuxUsingMapIO = demuxScanGenericIO
{-# INLINE demuxScanIO #-}
demuxScanIO :: (MonadIO m, Ord k) =>
(a -> k)
-> (k -> m (Fold m a b))
-> (k -> m (Maybe (Fold m a b)))
-> Scanl m a (Maybe (k, b))
demuxScanIO getKey = fmap snd . demuxUsingMapIO getKey

Expand Down Expand Up @@ -839,7 +847,7 @@ demuxToMap = demuxToContainer
--
{-# INLINE demuxerToMap #-}
demuxerToMap :: (Monad m, Ord k) =>
(a -> k) -> (k -> m (Fold m a b)) -> Fold m a (Map k b)
(a -> k) -> (k -> m (Maybe (Fold m a b))) -> Fold m a (Map k b)
demuxerToMap = demuxerToContainer

{-# DEPRECATED demuxToContainerIO "Use demuxerToContainerIO instead" #-}
Expand Down Expand Up @@ -869,13 +877,13 @@ demuxToMapIO = demuxToContainerIO
--
{-# INLINE demuxerToMapIO #-}
demuxerToMapIO :: (MonadIO m, Ord k) =>
(a -> k) -> (k -> m (Fold m a b)) -> Fold m a (Map k b)
(a -> k) -> (k -> m (Maybe (Fold m a b))) -> Fold m a (Map k b)
demuxerToMapIO = demuxerToContainerIO

{-# INLINE demuxKvToContainer #-}
demuxKvToContainer :: (Monad m, IsMap f, Traversable f) =>
(Key f -> m (Fold m a b)) -> Fold m (Key f, a) (f b)
demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f)
(Key f -> m (Maybe (Fold m a b))) -> Fold m (Key f, a) (f b)
demuxKvToContainer f = demuxerToContainer fst (fmap (fmap (lmap snd)) . f)

-- | Fold a stream of key value pairs using a function that maps keys to folds.
--
Expand All @@ -887,8 +895,8 @@ demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f)
--
-- >>> import Data.Map (Map)
-- >>> :{
-- let f "SUM" = return Fold.sum
-- f _ = return Fold.product
-- let f "SUM" = return (Just Fold.sum)
-- f _ = return (Just Fold.product)
-- input = Stream.fromList [("SUM",1),("PRODUCT",2),("SUM",3),("PRODUCT",4)]
-- in Stream.fold (Fold.demuxKvToMap f) input :: IO (Map String Int)
-- :}
Expand All @@ -897,7 +905,7 @@ demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f)
-- /Pre-release/
{-# INLINE demuxKvToMap #-}
demuxKvToMap :: (Monad m, Ord k) =>
(k -> m (Fold m a b)) -> Fold m (k, a) (Map k b)
(k -> m (Maybe (Fold m a b))) -> Fold m (k, a) (Map k b)
demuxKvToMap = demuxKvToContainer

------------------------------------------------------------------------------
Expand Down
24 changes: 14 additions & 10 deletions core/src/Streamly/Internal/Data/Scanl/Container.hs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ countDistinctInt = fmap (\(Tuple' _ n) -> n) $ foldl' step initial
{-# INLINE demuxGeneric #-}
demuxGeneric :: (Monad m, IsMap f, Traversable f) =>
(a -> Key f)
-> (Key f -> m (Scanl m a b))
-> (Key f -> m (Maybe (Scanl m a b)))
-> Scanl m a (m (f b), Maybe (Key f, b))
demuxGeneric getKey getFold =
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
Expand Down Expand Up @@ -309,8 +309,10 @@ demuxGeneric getKey getFold =
let k = getKey a
case IsMap.mapLookup k kv of
Nothing -> do
fld <- getFold k
runFold kv fld (k, a)
mfld <- getFold k
case mfld of
Nothing -> pure $ Tuple' kv Nothing
Just fld -> runFold kv fld (k, a)
Just f -> runFold kv f (k, a)

extract (Tuple' kv x) = return (Prelude.mapM f kv, x)
Expand All @@ -336,7 +338,7 @@ demuxGeneric getKey getFold =
{-# INLINE demuxUsingMap #-}
demuxUsingMap :: (Monad m, Ord k) =>
(a -> k)
-> (k -> m (Scanl m a b))
-> (k -> m (Maybe (Scanl m a b)))
-> Scanl m a (m (Map k b), Maybe (k, b))
demuxUsingMap = demuxGeneric

Expand Down Expand Up @@ -365,7 +367,7 @@ demuxUsingMap = demuxGeneric
{-# INLINE demux #-}
demux :: (Monad m, Ord k) =>
(a -> k)
-> (k -> m (Scanl m a b))
-> (k -> m (Maybe (Scanl m a b)))
-> Scanl m a (Maybe (k, b))
demux getKey = fmap snd . demuxUsingMap getKey

Expand All @@ -380,7 +382,7 @@ demux getKey = fmap snd . demuxUsingMap getKey
{-# INLINE demuxGenericIO #-}
demuxGenericIO :: (MonadIO m, IsMap f, Traversable f) =>
(a -> Key f)
-> (Key f -> m (Scanl m a b))
-> (Key f -> m (Maybe (Scanl m a b)))
-> Scanl m a (m (f b), Maybe (Key f, b))
demuxGenericIO getKey getFold =
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
Expand Down Expand Up @@ -429,8 +431,10 @@ demuxGenericIO getKey getFold =
let k = getKey a
case IsMap.mapLookup k kv of
Nothing -> do
f <- getFold k
initFold kv f (k, a)
res <- getFold k
case res of
Nothing -> pure $ Tuple' kv Nothing
Just f -> initFold kv f (k, a)
Just ref -> do
f <- liftIO $ readIORef ref
runFold kv ref f (k, a)
Expand Down Expand Up @@ -460,7 +464,7 @@ demuxGenericIO getKey getFold =
{-# INLINE demuxUsingMapIO #-}
demuxUsingMapIO :: (MonadIO m, Ord k) =>
(a -> k)
-> (k -> m (Scanl m a b))
-> (k -> m (Maybe (Scanl m a b)))
-> Scanl m a (m (Map k b), Maybe (k, b))
demuxUsingMapIO = demuxGenericIO

Expand All @@ -470,7 +474,7 @@ demuxUsingMapIO = demuxGenericIO
{-# INLINE demuxIO #-}
demuxIO :: (MonadIO m, Ord k) =>
(a -> k)
-> (k -> m (Scanl m a b))
-> (k -> m (Maybe (Scanl m a b)))
-> Scanl m a (Maybe (k, b))
demuxIO getKey = fmap snd . demuxUsingMapIO getKey

Expand Down
6 changes: 3 additions & 3 deletions test/Streamly/Test/Data/Fold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -596,9 +596,9 @@ foldBreak ls = monadicIO $ do

demux :: Expectation
demux =
let table "SUM" = return Fold.sum
table "PRODUCT" = return Fold.product
table _ = return Fold.length
let table "SUM" = return $ Just Fold.sum
table "PRODUCT" = return $ Just Fold.product
table _ = return $ Just Fold.length
input = Stream.fromList (
[ ("SUM", 1)
, ("abc", 1)
Expand Down

0 comments on commit bbf4ff5

Please sign in to comment.