diff --git a/src/Streamly/Internal/Data/Stream/IsStream/Top.hs b/src/Streamly/Internal/Data/Stream/IsStream/Top.hs index f03306c4bd..b2ee66fe5a 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream/Top.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream/Top.hs @@ -40,7 +40,7 @@ module Streamly.Internal.Data.Stream.IsStream.Top , joinInnerMerge , hashInnerJoin , leftJoin - , mergeLeftJoin + , joinLeftMerge , hashLeftJoin , outerJoin , mergeOuterJoin @@ -383,11 +383,14 @@ hashLeftJoin = undefined -- -- Time: O(m + n) -- --- /Unimplemented/ -{-# INLINE mergeLeftJoin #-} -mergeLeftJoin :: -- Monad m => +-- /Pre-release/ +{-# INLINE joinLeftMerge #-} +joinLeftMerge :: (IsStream t, MonadIO m, Eq a, Eq b) => (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b) -mergeLeftJoin _eq _s1 _s2 = undefined +joinLeftMerge eq s1 = + IsStream.fromStreamD + . StreamD.joinLeftMerge eq (IsStream.toStreamD s1) + . IsStream.toStreamD -- XXX We can do this concurrently. -- diff --git a/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs b/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs index a95fc3aead..6e88e91f2a 100644 --- a/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs +++ b/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs @@ -146,6 +146,7 @@ module Streamly.Internal.Data.Stream.StreamD.Nesting , unionBySorted , differenceBySorted , joinInnerMerge + , joinLeftMerge ) where @@ -3520,3 +3521,494 @@ joinInnerMerge cmp (Stream stepa ta) (Stream stepb tb) = return Stop step _ (_, _, _, _, _, _, _, _, _) = return Stop + +------------------------------------------------------------------------------- +-- Left Join of sored streams ------------------------------------------------- +------------------------------------------------------------------------------- +{-# INLINE_NORMAL joinLeftMerge #-} +joinLeftMerge :: (MonadIO m, Eq a, Eq b) => + (a -> b -> Ordering) + -> Stream m a -> Stream m b + -> Stream m (a, Maybe b) +joinLeftMerge cmp (Stream stepa ta) (Stream stepb tb) = + Stream + step + (Just ta, Just tb, Nothing, Nothing, Nothing, Nothing, Nothing, LeftRun, 0) + + where + {-# INLINE_LATE step #-} + + -- step 1 when left stream could be empty + step gst (Just sa, sb, Nothing, Nothing, pa, pb, _, LeftRun, idx) = do + -- liftIO $ print "Step 1" + ref <- liftIO $ newIORef [] + r <- stepa (adaptState gst) sa + return $ + case r of + Yield a' sa' -> + Skip + ( Just sa' + , sb + , Just a' + , Nothing + , pa + , pb + , Just ref + , RightRun + , idx + ) + Skip sa' -> + Skip + ( Just sa' + , sb + , Nothing + , Nothing + , pa + , pb + , Just ref + , LeftRun + , idx + ) + Stop -> + Stop + + -- step 2 both stream has data pull from right stream and + -- compare a and b + step gst (Just sa, Just sb, Just a, b, _, pb, buff, RightRun, idx) = do + -- liftIO $ print "Step 2" + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + Skip + ( Just sa + , Just sb' + , Just a + , Just b' + , Just a + , Just b' + , buff + , CompareRun + , idx + ) -- go to step 5 + Skip sb' -> + Skip + ( Just sa + , Just sb' + , Just a + , b + , Just a + , pb + , buff + , RightRun + , idx + ) + Stop -> + Yield + ( a, Nothing) + ( Just sa + , Nothing + , Just a + , Nothing + , Nothing + , Nothing + , Nothing + , LeftRun + , idx + ) + + -- step 3 both stream has data pull from right stream and in next step + -- compare b with previous b to remove mismatched duplicates from + -- right stream + step gst (Just sa, Just sb, Just a, b, pa, pb, buff, RightDupRun, idx) = do + -- liftIO $ print "Step 3" + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + Skip + ( Just sa + , Just sb' + , Just a + , Just b' + , pa + , pb + , buff + , CompareDupRun + , idx + ) -- step 4 + Skip sb' -> + Skip + ( Nothing + , Just sb' + , Nothing + , b + , pa + , Nothing + , buff + , RightRun + , idx + ) + Stop -> + Yield + ( a, Nothing) + ( Just sa + , Nothing + , Just a + , Nothing + , Nothing + , Nothing + , Nothing + , LeftRun + , idx + ) + + -- step 4 compare b with previous b to remove mismatched duplicates from + -- right stream + step + _ + ( Just sa + , sb + , Just a + , Just b + , pa + , Just pb + , buff + , CompareDupRun + , idx + ) = + do + -- liftIO $ print "Step 4" + return $ + if b == pb + then + Skip + ( Just sa + , sb + , Just a + , Just b + , pa + , Just pb + , buff + , RightDupRun + , idx + ) -- step 3 + else + Skip + ( Just sa + , sb + , Just a + ,Just b + , pa + , Just b + , buff + , CompareRun + , idx + ) -- step 5 + + -- step 5 compare left stream data with right stream + step _ (sa, sb, Just a, Just b, pa, pb, Just buff, CompareRun, idx) = do + -- liftIO $ print "Step 5" + let res = cmp a b + return $ + case res of + LT -> + Yield + (a, Nothing) + ( sa + , sb + , Just a + , Just b + , pa + , pb + , Just buff + , LeftRun + , idx + ) -- skip a step 9 + EQ -> + Skip + ( sa + , sb + , Just a + , Just b + , Just a + , pb + , Just buff + , BuffPrepare + , idx + ) -- step 6 + GT -> + Skip + ( sa + , sb + , Just a + , Just b + , pa + , Just b + , Just buff + , RightDupRun + , idx + ) -- skip b step 3 + + -- step 6 b in list initial step + step + gst + ( Just sa + , Just sb + , Just a + , Just b + , pa + , Just pb + , Just buff + , BuffPrepare + , idx + ) = + do + -- liftIO $ print "Step 6" + liftIO $ modifyIORef' buff (b : ) + r <- stepb (adaptState gst) sb + case r of + Yield b' sb' -> do + if b' == pb + then do + return $ + Skip -- step 7 + ( Just sa + , Just sb' + , Just a + , Just b' + , pa + , Just b' + , Just buff + , BuffPrepare + , idx + ) + else + return $ + Skip -- step 8 + ( Just sa + , Just sb' + , Just a + , Just b' + , pa + , Just b' + , Just buff + , BuffPair + , 0 + ) + Skip sb' -> + return $ + Skip + ( Nothing + , Just sb' + , Nothing + , Just b + , pa + , Nothing + , Just buff + , RightRun + , idx + ) + Stop -> + return $ + Skip + ( Just sa + , Just sb + , Just a + , Just b + , pa + , Just pb + , Just buff + , BuffPair + , 0 + ) -- go to step 8 + + -- step 8 do pairing with buff (only when repeatation is over) + step + _ + ( Just sa + , Just sb + , Just a + , Just b + , pa + , Just pb + , Just buff + , BuffPair + , idx + ) = + do + -- liftIO $ print "Step 7" + bl <- liftIO $ readIORef buff + if idx < length bl + then return $ + Yield + (a, Just (bl !! idx)) + ( Just sa + , Just sb + , Just a + , Just b + , pa + , Just pb + , Just buff + , BuffPair + , idx+1 + ) + else return $ + Skip -- step 11 + ( Just sa + , Just sb + , Just a + , Just b + , Just a + , Just pb + , Just buff + , BuffReset + , 0 + ) + + -- step 9 pull the data from left stream to compare next data + -- from right stream + step + gst + (Just sa, Just sb, Just a, Just b, pa, pb, buff, LeftRun, idx) = do + -- liftIO $ print "Step 8" + r <- stepa (adaptState gst) sa + return $ case r of + Yield a' sa' -> + Skip -- step 5 + ( Just sa' + , Just sb + , Just a' + , Just b + , Just a + , pb + , buff + , CompareRun + , idx + ) + Skip sa' -> + Skip + ( Just sa' + , Just sb + , Nothing + , Nothing + , pa + , pb + , buff + , RightRun + , idx + ) + Stop -> + Stop + + -- step 11 pull the data from left stream to compare next data + -- from right stream + step + gst + ( Just sa + , sb + , Just _ + , Just b + , Just pa + , pb + , Just buff + , BuffReset + , idx + ) = + do + -- liftIO $ print "Step 9" + r <- stepa (adaptState gst) sa + case r of + Yield a' sa' -> do + if a' == pa + then return $ + Skip -- step 8 + ( Just sa' + , sb + , Just a' + , Just b + , Just a' + , pb + , Just buff + , BuffPair + , idx + ) + else do + -- clear buff + liftIO $ writeIORef buff [] + return $ + Skip -- step 5 + ( Just sa' + , sb + , Just a' + , Just b + , Just a' + , pb + , Just buff + , CompareRun + , idx + ) + Skip sa' -> + return $ + Skip + ( Just sa' + , sb + , Nothing + , Nothing + , Just pa + , pb + , Just buff + , RightRun + , idx + ) + Stop -> + return Stop + + -- step 12 b stream has finished yield remaining a + step + gst + ( Just sa + , Nothing + , Just a + , Nothing + , Nothing + , Nothing + , Nothing + , LeftRun + , idx + ) = + do + -- liftIO $ print "Step 10" + r <- stepa (adaptState gst) sa + return $ + case r of + Yield a' sa' -> + Yield -- step 5 + (a', Nothing) + ( Just sa' + , Nothing + , Just a' + , Nothing + , Nothing + , Nothing + , Nothing + , LeftRun + , idx + ) + Skip sa' -> + Skip + ( Just sa' + , Nothing + , Just a + , Nothing + , Nothing + , Nothing + , Nothing + , LeftRun + , idx + ) + Stop -> Stop + + step _ (_, _, _, _, _, _, _, _, _) = do + -- liftIO $ print "Step 11" + return Stop \ No newline at end of file diff --git a/test/Streamly/Test/Data/Stream/Top.hs b/test/Streamly/Test/Data/Stream/Top.hs index 5a18ed997b..2c0bd91d70 100644 --- a/test/Streamly/Test/Data/Stream/Top.hs +++ b/test/Streamly/Test/Data/Stream/Top.hs @@ -1,7 +1,7 @@ module Main (main) where -import Data.List (intersect, sort, union, (\\)) +import Data.List (elem, intersect, nub, sort, union, (\\)) import Test.QuickCheck ( Gen , Property @@ -102,6 +102,30 @@ joinInnerMerge = (S.fromList ls1) let v2 = [ (i,j) | i <- ls0, j <- ls1, i == j ] assert (v1 == v2) + +joinLeftMerge :: Property +joinLeftMerge = + forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 -> + forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 -> + -- nub the second list as no way to validate using list functions + monadicIO $ action (sort ls0) (sort (nub ls1)) + + where + + action ls0 ls1 = do + v1 <- + run + $ S.toList + $ Top.joinLeftMerge + compare + (S.fromList ls0) + (S.fromList ls1) + let v2 = do + i <- ls0 + if (elem i ls1) + then return (i, Just i) + else return (i, Nothing) + assert (v1 == v2) ------------------------------------------------------------------------------- moduleName :: String moduleName = "Data.Stream.Top" @@ -114,3 +138,4 @@ main = hspec $ do prop "unionBySorted" Main.unionBySorted prop "differenceBySorted" Main.differenceBySorted prop "joinInnerMerge" Main.joinInnerMerge + prop "joinLeftMerge" Main.joinLeftMerge