From bbf4ff5f27ab926fcaaf0951bca01aa6b570e0f2 Mon Sep 17 00:00:00 2001 From: Adithya Kumar Date: Sat, 21 Dec 2024 03:34:44 +0530 Subject: [PATCH] Make the fetch function return a Maybe type in demux-like functions --- benchmark/Streamly/Benchmark/Data/Fold.hs | 44 ++++++++------- .../Streamly/Internal/Data/Fold/Container.hs | 54 +++++++++++-------- .../Streamly/Internal/Data/Scanl/Container.hs | 24 +++++---- test/Streamly/Test/Data/Fold.hs | 6 +-- 4 files changed, 73 insertions(+), 55 deletions(-) diff --git a/benchmark/Streamly/Benchmark/Data/Fold.hs b/benchmark/Streamly/Benchmark/Data/Fold.hs index 48d6b30540..8f3a0d9f7c 100644 --- a/benchmark/Streamly/Benchmark/Data/Fold.hs +++ b/benchmark/Streamly/Benchmark/Data/Fold.hs @@ -10,7 +10,6 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# OPTIONS_GHC -fno-warn-warnings-deprecations #-} {-# OPTIONS_GHC -Wno-orphans #-} #undef FUSION_CHECK @@ -48,6 +47,7 @@ import Streamly.Internal.Data.MutArray (MutArray) import qualified Streamly.Internal.Data.Array as Array import qualified Streamly.Internal.Data.Fold as FL +import qualified Streamly.Internal.Data.Scanl as Scanl import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Parser as Parser import qualified Streamly.Internal.Data.Pipe as Pipe @@ -130,14 +130,14 @@ filter _ = Stream.fold (FL.filter even FL.drain) {-# INLINE scanMaybe #-} scanMaybe :: Monad m => Int -> Stream m Int -> m () -scanMaybe _ = Stream.fold (FL.scanMaybe (FL.filtering even) FL.drain) +scanMaybe _ = Stream.fold (FL.postscanlMaybe (Scanl.filtering even) FL.drain) {-# INLINE scanMaybe2 #-} scanMaybe2 :: Monad m => Int -> Stream m Int -> m () scanMaybe2 _ = Stream.fold - $ FL.scanMaybe (FL.filtering even) - $ FL.scanMaybe (FL.filtering odd) FL.drain + $ FL.postscanlMaybe (Scanl.filtering even) + $ FL.postscanlMaybe (Scanl.filtering odd) FL.drain ------------------------------------------------------------------------------- -- Splitting in two @@ -423,18 +423,18 @@ partitionByMinM = {-# INLINE demuxToMap #-} demuxToMap :: (Monad m, Ord k) => - (a -> k) -> (a -> m (Fold m a b)) -> Stream m a -> m (Map k b) -demuxToMap f g = Stream.fold (FL.demuxToContainer f g) + (a -> k) -> (k -> m (Maybe (Fold m a b))) -> Stream m a -> m (Map k b) +demuxToMap f g = Stream.fold (FL.demuxerToContainer f g) {-# INLINE demuxToIntMap #-} demuxToIntMap :: Monad m => - (a -> Int) -> (a -> m (Fold m a b)) -> Stream m a -> m (IntMap b) -demuxToIntMap f g = Stream.fold (FL.demuxToContainer f g) + (a -> Int) -> (Int -> m (Maybe (Fold m a b))) -> Stream m a -> m (IntMap b) +demuxToIntMap f g = Stream.fold (FL.demuxerToContainer f g) {-# INLINE demuxToMapIO #-} demuxToMapIO :: (MonadIO m, Ord k) => - (a -> k) -> (a -> m (Fold m a b)) -> Stream m a -> m (Map k b) -demuxToMapIO f g = Stream.fold (FL.demuxToContainerIO f g) + (a -> k) -> (k -> m (Maybe (Fold m a b))) -> Stream m a -> m (Map k b) +demuxToMapIO f g = Stream.fold (FL.demuxerToContainerIO f g) {-# INLINE toMap #-} toMap :: @@ -506,9 +506,9 @@ o_1_space_serial_elimination :: Int -> [Benchmark] o_1_space_serial_elimination value = [ bgroup "elimination" [ benchIOSink value "drain" (Stream.fold FL.drain) - , benchIOSink value "drainBy" (Stream.fold (FL.drainBy return)) + , benchIOSink value "drainBy" (Stream.fold (FL.drainMapM return)) , benchIOSink value "drainN" (Stream.fold (FL.drainN value)) - , benchIOSink value "last" (Stream.fold FL.last) + , benchIOSink value "last" (Stream.fold FL.latest) , benchIOSink value "length" (Stream.fold FL.length) , benchIOSink value "top" (Stream.fold $ FL.top 10) , benchIOSink value "bottom" (Stream.fold $ FL.bottom 10) @@ -523,6 +523,11 @@ o_1_space_serial_elimination value = value "mean" (Stream.fold FL.mean . fmap (fromIntegral :: Int -> Double)) +{- + -- These are already benchmarked in streamly-statistics package. If we + -- still want to keep these tests here, perhaps we should move them to a + -- different module so we can remove -fno-warn-warnings-deprecations. + , benchIOSink value "variance" @@ -531,6 +536,7 @@ o_1_space_serial_elimination value = value "stdDev" (Stream.fold FL.stdDev . fmap (fromIntegral :: Int -> Double)) +-} , benchIOSink value "mconcat" @@ -601,15 +607,15 @@ o_1_space_serial_transformation value = , benchIOSink value "fold-scan" - (Stream.fold $ FL.scan FL.sum FL.drain) + (Stream.fold $ FL.scanl Scanl.sum FL.drain) , benchIOSink value "fold-scanMany" - (Stream.fold $ FL.scanMany (FL.take 2 FL.drain) FL.drain) + (Stream.fold $ FL.scanlMany (Scanl.take 2 Scanl.drain) FL.drain) , benchIOSink value "fold-postscan" - (Stream.fold $ FL.postscan FL.sum FL.drain) + (Stream.fold $ FL.postscanl Scanl.sum FL.drain) ] ] @@ -665,11 +671,11 @@ o_n_heap_serial value = , bgroup "key-value" [ benchIOSink value "demuxToMap (64 buckets) [sum, length]" - $ demuxToMap (getKey 64) (getFold . getKey 64) + $ demuxToMap (getKey 64) getFold , benchIOSink value "demuxToIntMap (64 buckets) [sum, length]" - $ demuxToIntMap (getKey 64) (getFold . getKey 64) + $ demuxToIntMap (getKey 64) getFold , benchIOSink value "demuxToMapIO (64 buckets) [sum, length]" - $ demuxToMapIO (getKey 64) (getFold . getKey 64) + $ demuxToMapIO (getKey 64) getFold -- classify: immutable , benchIOSink value "toMap (64 buckets) sum" @@ -694,7 +700,7 @@ o_n_heap_serial value = getKey buckets = (`mod` buckets) getFold k = - return $ case k of + return $ Just $ case k of 0 -> FL.sum 1 -> FL.length _ -> FL.length diff --git a/core/src/Streamly/Internal/Data/Fold/Container.hs b/core/src/Streamly/Internal/Data/Fold/Container.hs index 390f1d4a24..b09c90890a 100644 --- a/core/src/Streamly/Internal/Data/Fold/Container.hs +++ b/core/src/Streamly/Internal/Data/Fold/Container.hs @@ -354,7 +354,7 @@ demuxGeneric getKey getFold = {-# INLINE demuxerToContainer #-} demuxerToContainer :: (Monad m, IsMap f, Traversable f) => (a -> Key f) - -> (Key f -> m (Fold m a b)) + -> (Key f -> m (Maybe (Fold m a b))) -> Fold m a (f b) demuxerToContainer getKey getFold = Fold (\s a -> Partial <$> step s a) (Partial <$> initial) undefined final @@ -388,8 +388,10 @@ demuxerToContainer getKey getFold = let k = getKey a case IsMap.mapLookup k kv of Nothing -> do - fld <- getFold k - runFold kv kv1 fld (k, a) + mfld <- getFold k + case mfld of + Nothing -> pure $ Tuple' kv kv1 + Just fld -> runFold kv kv1 fld (k, a) Just f -> runFold kv kv1 f (k, a) final (Tuple' kv kv1) = do @@ -408,7 +410,7 @@ demuxerToContainer getKey getFold = {-# INLINE demuxScanGeneric #-} demuxScanGeneric :: (Monad m, IsMap f, Traversable f) => (a -> Key f) - -> (Key f -> m (Fold m a b)) + -> (Key f -> m (Maybe (Fold m a b))) -> Scanl m a (m (f b), Maybe (Key f, b)) demuxScanGeneric getKey getFold = Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final @@ -439,8 +441,10 @@ demuxScanGeneric getKey getFold = let k = getKey a case IsMap.mapLookup k kv of Nothing -> do - fld <- getFold k - runFold kv fld (k, a) + mfld <- getFold k + case mfld of + Nothing -> pure $ Tuple' kv Nothing + Just fld -> runFold kv fld (k, a) Just f -> runFold kv f (k, a) extract (Tuple' kv x) = return (Prelude.mapM f kv, x) @@ -500,7 +504,7 @@ demux = demuxGeneric {-# INLINE demuxUsingMap #-} demuxUsingMap :: (Monad m, Ord k) => (a -> k) - -> (k -> m (Fold m a b)) + -> (k -> m (Maybe (Fold m a b))) -> Scanl m a (m (Map k b), Maybe (k, b)) demuxUsingMap = demuxScanGeneric @@ -512,7 +516,7 @@ demuxUsingMap = demuxScanGeneric {-# INLINE demuxScan #-} demuxScan :: (Monad m, Ord k) => (a -> k) - -> (k -> m (Fold m a b)) + -> (k -> m (Maybe (Fold m a b))) -> Scanl m a (Maybe (k, b)) demuxScan getKey = fmap snd . demuxUsingMap getKey @@ -601,7 +605,7 @@ demuxGenericIO getKey getFold = {-# INLINE demuxerToContainerIO #-} demuxerToContainerIO :: (MonadIO m, IsMap f, Traversable f) => (a -> Key f) - -> (Key f -> m (Fold m a b)) + -> (Key f -> m (Maybe (Fold m a b))) -> Fold m a (f b) demuxerToContainerIO getKey getFold = Fold (\s a -> Partial <$> step s a) (Partial <$> initial) undefined final @@ -647,8 +651,10 @@ demuxerToContainerIO getKey getFold = let k = getKey a case IsMap.mapLookup k kv of Nothing -> do - f <- getFold k - initFold kv kv1 f (k, a) + res <- getFold k + case res of + Nothing -> pure $ Tuple' kv kv1 + Just f -> initFold kv kv1 f (k, a) Just ref -> do f <- liftIO $ readIORef ref runFold kv kv1 ref f (k, a) @@ -675,7 +681,7 @@ demuxerToContainerIO getKey getFold = {-# INLINE demuxScanGenericIO #-} demuxScanGenericIO :: (MonadIO m, IsMap f, Traversable f) => (a -> Key f) - -> (Key f -> m (Fold m a b)) + -> (Key f -> m (Maybe (Fold m a b))) -> Scanl m a (m (f b), Maybe (Key f, b)) demuxScanGenericIO getKey getFold = Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final @@ -721,8 +727,10 @@ demuxScanGenericIO getKey getFold = let k = getKey a case IsMap.mapLookup k kv of Nothing -> do - f <- getFold k - initFold kv f (k, a) + res <- getFold k + case res of + Nothing -> pure $ Tuple' kv Nothing + Just f -> initFold kv f (k, a) Just ref -> do f <- liftIO $ readIORef ref runFold kv ref f (k, a) @@ -766,7 +774,7 @@ demuxIO = demuxGenericIO {-# INLINE demuxUsingMapIO #-} demuxUsingMapIO :: (MonadIO m, Ord k) => (a -> k) - -> (k -> m (Fold m a b)) + -> (k -> m (Maybe (Fold m a b))) -> Scanl m a (m (Map k b), Maybe (k, b)) demuxUsingMapIO = demuxScanGenericIO @@ -779,7 +787,7 @@ demuxUsingMapIO = demuxScanGenericIO {-# INLINE demuxScanIO #-} demuxScanIO :: (MonadIO m, Ord k) => (a -> k) - -> (k -> m (Fold m a b)) + -> (k -> m (Maybe (Fold m a b))) -> Scanl m a (Maybe (k, b)) demuxScanIO getKey = fmap snd . demuxUsingMapIO getKey @@ -839,7 +847,7 @@ demuxToMap = demuxToContainer -- {-# INLINE demuxerToMap #-} demuxerToMap :: (Monad m, Ord k) => - (a -> k) -> (k -> m (Fold m a b)) -> Fold m a (Map k b) + (a -> k) -> (k -> m (Maybe (Fold m a b))) -> Fold m a (Map k b) demuxerToMap = demuxerToContainer {-# DEPRECATED demuxToContainerIO "Use demuxerToContainerIO instead" #-} @@ -869,13 +877,13 @@ demuxToMapIO = demuxToContainerIO -- {-# INLINE demuxerToMapIO #-} demuxerToMapIO :: (MonadIO m, Ord k) => - (a -> k) -> (k -> m (Fold m a b)) -> Fold m a (Map k b) + (a -> k) -> (k -> m (Maybe (Fold m a b))) -> Fold m a (Map k b) demuxerToMapIO = demuxerToContainerIO {-# INLINE demuxKvToContainer #-} demuxKvToContainer :: (Monad m, IsMap f, Traversable f) => - (Key f -> m (Fold m a b)) -> Fold m (Key f, a) (f b) -demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f) + (Key f -> m (Maybe (Fold m a b))) -> Fold m (Key f, a) (f b) +demuxKvToContainer f = demuxerToContainer fst (fmap (fmap (lmap snd)) . f) -- | Fold a stream of key value pairs using a function that maps keys to folds. -- @@ -887,8 +895,8 @@ demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f) -- -- >>> import Data.Map (Map) -- >>> :{ --- let f "SUM" = return Fold.sum --- f _ = return Fold.product +-- let f "SUM" = return (Just Fold.sum) +-- f _ = return (Just Fold.product) -- input = Stream.fromList [("SUM",1),("PRODUCT",2),("SUM",3),("PRODUCT",4)] -- in Stream.fold (Fold.demuxKvToMap f) input :: IO (Map String Int) -- :} @@ -897,7 +905,7 @@ demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f) -- /Pre-release/ {-# INLINE demuxKvToMap #-} demuxKvToMap :: (Monad m, Ord k) => - (k -> m (Fold m a b)) -> Fold m (k, a) (Map k b) + (k -> m (Maybe (Fold m a b))) -> Fold m (k, a) (Map k b) demuxKvToMap = demuxKvToContainer ------------------------------------------------------------------------------ diff --git a/core/src/Streamly/Internal/Data/Scanl/Container.hs b/core/src/Streamly/Internal/Data/Scanl/Container.hs index eeeb94bc5c..1cc6044176 100644 --- a/core/src/Streamly/Internal/Data/Scanl/Container.hs +++ b/core/src/Streamly/Internal/Data/Scanl/Container.hs @@ -274,7 +274,7 @@ countDistinctInt = fmap (\(Tuple' _ n) -> n) $ foldl' step initial {-# INLINE demuxGeneric #-} demuxGeneric :: (Monad m, IsMap f, Traversable f) => (a -> Key f) - -> (Key f -> m (Scanl m a b)) + -> (Key f -> m (Maybe (Scanl m a b))) -> Scanl m a (m (f b), Maybe (Key f, b)) demuxGeneric getKey getFold = Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final @@ -309,8 +309,10 @@ demuxGeneric getKey getFold = let k = getKey a case IsMap.mapLookup k kv of Nothing -> do - fld <- getFold k - runFold kv fld (k, a) + mfld <- getFold k + case mfld of + Nothing -> pure $ Tuple' kv Nothing + Just fld -> runFold kv fld (k, a) Just f -> runFold kv f (k, a) extract (Tuple' kv x) = return (Prelude.mapM f kv, x) @@ -336,7 +338,7 @@ demuxGeneric getKey getFold = {-# INLINE demuxUsingMap #-} demuxUsingMap :: (Monad m, Ord k) => (a -> k) - -> (k -> m (Scanl m a b)) + -> (k -> m (Maybe (Scanl m a b))) -> Scanl m a (m (Map k b), Maybe (k, b)) demuxUsingMap = demuxGeneric @@ -365,7 +367,7 @@ demuxUsingMap = demuxGeneric {-# INLINE demux #-} demux :: (Monad m, Ord k) => (a -> k) - -> (k -> m (Scanl m a b)) + -> (k -> m (Maybe (Scanl m a b))) -> Scanl m a (Maybe (k, b)) demux getKey = fmap snd . demuxUsingMap getKey @@ -380,7 +382,7 @@ demux getKey = fmap snd . demuxUsingMap getKey {-# INLINE demuxGenericIO #-} demuxGenericIO :: (MonadIO m, IsMap f, Traversable f) => (a -> Key f) - -> (Key f -> m (Scanl m a b)) + -> (Key f -> m (Maybe (Scanl m a b))) -> Scanl m a (m (f b), Maybe (Key f, b)) demuxGenericIO getKey getFold = Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final @@ -429,8 +431,10 @@ demuxGenericIO getKey getFold = let k = getKey a case IsMap.mapLookup k kv of Nothing -> do - f <- getFold k - initFold kv f (k, a) + res <- getFold k + case res of + Nothing -> pure $ Tuple' kv Nothing + Just f -> initFold kv f (k, a) Just ref -> do f <- liftIO $ readIORef ref runFold kv ref f (k, a) @@ -460,7 +464,7 @@ demuxGenericIO getKey getFold = {-# INLINE demuxUsingMapIO #-} demuxUsingMapIO :: (MonadIO m, Ord k) => (a -> k) - -> (k -> m (Scanl m a b)) + -> (k -> m (Maybe (Scanl m a b))) -> Scanl m a (m (Map k b), Maybe (k, b)) demuxUsingMapIO = demuxGenericIO @@ -470,7 +474,7 @@ demuxUsingMapIO = demuxGenericIO {-# INLINE demuxIO #-} demuxIO :: (MonadIO m, Ord k) => (a -> k) - -> (k -> m (Scanl m a b)) + -> (k -> m (Maybe (Scanl m a b))) -> Scanl m a (Maybe (k, b)) demuxIO getKey = fmap snd . demuxUsingMapIO getKey diff --git a/test/Streamly/Test/Data/Fold.hs b/test/Streamly/Test/Data/Fold.hs index f31f25e31e..c94c5daea9 100644 --- a/test/Streamly/Test/Data/Fold.hs +++ b/test/Streamly/Test/Data/Fold.hs @@ -596,9 +596,9 @@ foldBreak ls = monadicIO $ do demux :: Expectation demux = - let table "SUM" = return Fold.sum - table "PRODUCT" = return Fold.product - table _ = return Fold.length + let table "SUM" = return $ Just Fold.sum + table "PRODUCT" = return $ Just Fold.product + table _ = return $ Just Fold.length input = Stream.fromList ( [ ("SUM", 1) , ("abc", 1)