diff --git a/grapesy.cabal b/grapesy.cabal index 268dc349..5e854ba0 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -212,7 +212,7 @@ library , exceptions >= 0.10 && < 0.11 , hashable >= 1.3 && < 1.5 , http-types >= 0.12 && < 0.13 - , http2 >= 5.3.1 && < 5.4 + , http2 >= 5.3.4 && < 5.4 , http2-tls >= 0.4.1 && < 0.5 , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 @@ -350,7 +350,7 @@ test-suite test-grapesy , containers >= 0.6 && < 0.8 , exceptions >= 0.10 && < 0.11 , http-types >= 0.12 && < 0.13 - , http2 >= 5.3.1 && < 5.4 + , http2 >= 5.3.4 && < 5.4 , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 , network >= 3.1 && < 3.3 diff --git a/interop/Interop/Client/TestCase/CancelAfterBegin.hs b/interop/Interop/Client/TestCase/CancelAfterBegin.hs index f65fb76a..307369bf 100644 --- a/interop/Interop/Client/TestCase/CancelAfterBegin.hs +++ b/interop/Interop/Client/TestCase/CancelAfterBegin.hs @@ -15,7 +15,7 @@ import Proto.API.Interop -- cancellation gets reported by the grapesy client library itself. runTest :: Cmdline -> IO () runTest cmdline = - withConnection def (testServer cmdline) $ \conn -> do + withConnection def (testServer cmdline) $ \conn -> assertThrows (assertEqual GrpcCancelled . grpcError) $ withRPC conn def (Proxy @StreamingInputCall) $ \_call -> -- Immediately cancel request diff --git a/interop/Interop/Server.hs b/interop/Interop/Server.hs index 86ebfda7..b2f87a12 100644 --- a/interop/Interop/Server.hs +++ b/interop/Interop/Server.hs @@ -88,7 +88,7 @@ withInteropServer cmdline k = do = ServerConfig { serverSecure = Nothing , serverInsecure = Just InsecureConfig { - insecureHost = Nothing + insecureHost = Just "127.0.0.1" , insecurePort = cmdPort cmdline } } diff --git a/src/Network/GRPC/Client/Call.hs b/src/Network/GRPC/Client/Call.hs index 292f0ce7..f3ef6b9b 100644 --- a/src/Network/GRPC/Client/Call.hs +++ b/src/Network/GRPC/Client/Call.hs @@ -36,12 +36,13 @@ import Control.Concurrent.STM import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class +import Data.Bifunctor import Data.Bitraversable import Data.ByteString.Char8 qualified as BS.Strict.C8 import Data.Default import Data.Foldable (asum) import Data.List (intersperse) -import Data.Maybe (fromMaybe, isJust) +import Data.Maybe (fromMaybe) import Data.Proxy import Data.Text qualified as Text import Data.Version @@ -55,13 +56,12 @@ import Network.GRPC.Common.Compression qualified as Compression import Network.GRPC.Common.StreamElem qualified as StreamElem import Network.GRPC.Spec import Network.GRPC.Util.GHC +import Network.GRPC.Util.HKD qualified as HKD import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..)) import Network.GRPC.Util.Session qualified as Session import Network.GRPC.Util.Thread qualified as Thread import Paths_grapesy qualified as Grapesy -import Network.GRPC.Util.HKD qualified as HKD -import Data.Bifunctor {------------------------------------------------------------------------------- Open a call @@ -111,17 +111,70 @@ withRPC conn callParams proxy k = fmap fst $ generalBracket (liftIO $ startRPC conn proxy callParams) closeRPC - k + (k . fst) where - closeRPC :: Call rpc -> ExitCase a -> m () - closeRPC call exitCase = liftIO $ do + closeRPC :: (Call rpc, Session.CancelRequest) -> ExitCase a -> m () + closeRPC (call, cancelRequest) exitCase = liftIO $ do + -- /Before/ we do anything else (see below), check if we have evidence + -- that we can discard the connection. + canDiscard <- checkCanDiscard call + + -- Send the RST_STREAM frame /before/ closing the outbound thread. + -- + -- When we call 'Session.close', we will terminate the + -- 'sendMessageLoop', @http2@ will interpret this as a clean termination + -- of the stream. We must therefore cancel this stream before calling + -- 'Session.close'. /If/ the final message has already been sent, + -- @http2@ guarantees (as a postcondition of @outBodyPushFinal@) that + -- cancellation will be a no-op. + sendResetFrame cancelRequest exitCase + + -- Now close the /outbound/ thread, see docs of 'Session.close' for + -- details. mException <- liftIO $ Session.close (callChannel call) exitCase case mException of - Nothing -> return () + Nothing -> + -- The outbound thread had already terminated + return () Just ex -> case fromException ex of - Nothing -> throwM ex - Just discarded -> throwCancelled call discarded + Nothing -> + -- We are leaving the scope of 'withRPC' because of an exception + -- in the client, just rethrow that exception. + throwM ex + Just discarded -> + -- We are leaving the scope of 'withRPC' without having sent the + -- final message. + -- + -- If the server was closed before we cancelled the stream, this + -- means that the server unilaterally closed the connection. + -- This should be regarded as normal termination of the RPC (see + -- the docs for 'withRPC') + -- + -- Otherwise, the client left the scope of 'withRPC' before the + -- RPC was complete, which the gRPC spec mandates to result in a + -- 'GrpcCancelled' exception. See docs of 'throwCancelled'. + unless canDiscard $ + throwCancelled discarded + + -- Send a @RST_STREAM@ frame if necessary + sendResetFrame :: Session.CancelRequest -> ExitCase a -> IO () + sendResetFrame cancelRequest exitCase = + cancelRequest $ + case exitCase of + ExitCaseSuccess _ -> + -- Error code will be CANCEL + Nothing + ExitCaseAbort -> + -- Error code will be INTERNAL_ERROR. The client aborted with an + -- error that we don't have access to. We want to tell the server + -- that something has gone wrong (i.e. INTERNAL_ERROR), so we must + -- pass an exception, however the exact nature of the exception is + -- not particularly important as it is only recorded locally. + Just . toException $ Session.ChannelAborted callStack + ExitCaseException e -> + -- Error code will be INTERNAL_ERROR + Just e -- The spec mandates that when a client cancels a request (which in grapesy -- means exiting the scope of withRPC), the client receives a CANCELLED @@ -129,7 +182,7 @@ withRPC conn callParams proxy k = fmap fst $ -- the server might have already closed the connection. The client must have -- evidence that this is the case, which could mean one of two things: -- - -- o The received the final message from the server + -- o The client received the final message from the server -- o The server threw an exception (and the client saw this) -- -- We can check for the former using 'channelRecvFinal', and the latter @@ -141,7 +194,7 @@ withRPC conn callParams proxy k = fmap fst $ -- o If the server threw an exception, and the client observed this, then -- the inbound thread state /must/ have changed to 'ThreadException'. -- - -- Note that it /not/ sufficient to check if the inbound thread has + -- Note that it is /not/ sufficient to check if the inbound thread has -- terminated: we might have received the final message, but the thread -- might still be /about/ to terminate, but not /actually/ have terminated. -- @@ -149,30 +202,45 @@ withRPC conn callParams proxy k = fmap fst $ -- -- o -- o - throwCancelled :: Call rpc -> ChannelDiscarded -> IO () - throwCancelled Call{callChannel} (ChannelDiscarded cs) = do + throwCancelled :: ChannelDiscarded -> IO () + throwCancelled (ChannelDiscarded cs) = do + throwM $ GrpcException { + grpcError = GrpcCancelled + , grpcErrorMessage = Just $ mconcat [ + "Channel discarded by client at " + , Text.pack $ prettyCallStack cs + ] + , grpcErrorMetadata = [] + } + + checkCanDiscard :: Call rpc -> IO Bool + checkCanDiscard Call{callChannel} = do mRecvFinal <- atomically $ readTVar $ Session.channelRecvFinal callChannel + let onNotRunning :: STM () + onNotRunning = return () mTerminated <- atomically $ - Thread.hasThreadTerminated $ Session.channelInbound callChannel - let serverClosed :: Bool - serverClosed = or [ - case mRecvFinal of - Session.RecvNotFinal -> False - Session.RecvWithoutTrailers _ -> True - Session.RecvFinal _ -> True - , isJust mTerminated - ] - - unless serverClosed $ - throwM $ GrpcException { - grpcError = GrpcCancelled - , grpcErrorMessage = Just $ mconcat [ - "Channel discarded by client at " - , Text.pack $ prettyCallStack cs - ] - , grpcErrorMetadata = [] - } + Thread.getThreadState_ + (Session.channelInbound callChannel) + onNotRunning + return $ + or [ + case mRecvFinal of + Session.RecvNotFinal -> False + Session.RecvWithoutTrailers _ -> True + Session.RecvFinal _ -> True + + -- We are checking if we have evidence that we can discard the + -- channel. If the inbound thread is not yet running, this implies + -- that the server has not yet initiated their response to us, + -- which means we have no evidence to believe we can discard the + -- channel. + , case mTerminated of + Thread.ThreadNotYetRunning_ () -> False + Thread.ThreadRunning_ -> False + Thread.ThreadDone_ -> True + Thread.ThreadException_ _ -> True + ] -- | Open new channel to the server -- @@ -186,7 +254,7 @@ startRPC :: forall rpc. => Connection -> Proxy rpc -> CallParams rpc - -> IO (Call rpc) + -> IO (Call rpc, Session.CancelRequest) startRPC conn _ callParams = do (connClosed, connToServer) <- Connection.getConnectionToServer conn cOut <- Connection.getOutboundCompression conn @@ -205,7 +273,7 @@ startRPC conn _ callParams = do . grpcClassifyTermination . either trailersOnlyToProperTrailers' id - channel <- + (channel, cancelRequest) <- Session.setupRequestChannel session connToServer @@ -235,7 +303,7 @@ startRPC conn _ callParams = do _mAlreadyClosed <- Session.close channel exitReason return () - return $ Call channel + return (Call channel, cancelRequest) where connParams :: ConnParams connParams = Connection.connParams conn @@ -308,7 +376,7 @@ sendInputWithMeta Call{callChannel} msg = liftIO $ do -- This should be called before exiting the scope of 'withRPC'. StreamElem.whenDefinitelyFinal msg $ \_ -> - void $ Session.waitForOutbound callChannel + Session.waitForOutbound callChannel -- | Receive an output from the peer -- diff --git a/src/Network/GRPC/Server/Call.hs b/src/Network/GRPC/Server/Call.hs index 4f7f4113..f848da5e 100644 --- a/src/Network/GRPC/Server/Call.hs +++ b/src/Network/GRPC/Server/Call.hs @@ -405,12 +405,12 @@ sendOutputWithMeta call@Call{callChannel} msg = do msg' <- bitraverse mkTrailers return msg Session.send callChannel msg' - -- This /must/ be called before leaving the scope of 'acceptCall' (or we + -- This /must/ be called before leaving the scope of 'runHandler' (or we -- risk that the HTTP2 stream is cancelled). We can't call 'waitForOutbound' - -- /in/ 'acceptCall', because if the handler for whatever reason never + -- /in/ 'runHandler', because if the handler for whatever reason never -- writes the final message, such a call would block indefinitely. StreamElem.whenDefinitelyFinal msg $ \_ -> - void $ Session.waitForOutbound callChannel + Session.waitForOutbound callChannel where mkTrailers :: ResponseTrailingMetadata rpc -> IO ProperTrailers mkTrailers metadata = do @@ -650,7 +650,7 @@ sendProperTrailers Call{callContext, callResponseKickoff, callChannel} -- If we didn't update, then the response has already been initiated and -- we cannot make use of the Trailers-Only case. Session.send callChannel (NoMoreElems trailers) - void $ Session.waitForOutbound callChannel + Session.waitForOutbound callChannel where ServerContext{serverParams} = callContext diff --git a/src/Network/GRPC/Server/RequestHandler/API.hs b/src/Network/GRPC/Server/RequestHandler/API.hs index 8efd849f..cf23cf4b 100644 --- a/src/Network/GRPC/Server/RequestHandler/API.hs +++ b/src/Network/GRPC/Server/RequestHandler/API.hs @@ -29,9 +29,9 @@ requestHandlerToServer :: -- ^ Request handler -- -- We can assume in 'requestHandlerToServer' that the handler will not - -- throw any exceptions(doing so will cause @http2@ to reset the stream, + -- throw any exceptions (doing so will cause @http2@ to reset the stream, -- which is not always the right thing to do; see detailed comments in - -- 'acceptCall'). It is the responsibility of 'serverTopLevel' (prior to + -- 'runHandler'). It is the responsibility of 'serverTopLevel' (prior to -- calling 'requestHandlerToServer') to catch any remaining exceptions. -> HTTP2.Server requestHandlerToServer handler req _aux respond = diff --git a/test-grapesy/Test/Driver/Dialogue/Execution.hs b/test-grapesy/Test/Driver/Dialogue/Execution.hs index 13eea7bb..62fd76b4 100644 --- a/test-grapesy/Test/Driver/Dialogue/Execution.hs +++ b/test-grapesy/Test/Driver/Dialogue/Execution.hs @@ -2,7 +2,8 @@ {-# OPTIONS_GHC -Wno-orphans #-} module Test.Driver.Dialogue.Execution ( - execGlobalSteps + ConnUsage(..) + , execGlobalSteps ) where import Control.Concurrent @@ -16,6 +17,7 @@ import Data.Proxy import Data.Text qualified as Text import GHC.Stack import GHC.TypeLits +import Network.HTTP2.Client qualified as HTTP2.Client import Network.GRPC.Client qualified as Client import Network.GRPC.Client.Binary qualified as Client.Binary @@ -271,21 +273,17 @@ clientLocal clock call = \(LocalSteps steps) -> clientGlobal :: TestClock - -> Bool + -> ConnUsage -- ^ Use new connection for each RPC call? -- -- Multiple RPC calls on a single connection /ought/ to be independent of - -- each other, with something going wrong on one should not affect another. - -- This is currently however not the case, I /think/ due to limitations of - -- @http2@. - -- - -- See . + -- each other. Something going wrong on one should not affect another. -> GlobalSteps -> TestClient -clientGlobal clock connPerRPC global connParams testServer delimitTestScope = - if connPerRPC - then go Nothing [] (getGlobalSteps global) - else withConn $ \c -> go (Just c) [] (getGlobalSteps global) +clientGlobal clock connUsage global connParams testServer delimitTestScope = + case connUsage of + ConnPerRPC -> go Nothing [] (getGlobalSteps global) + SharedConn -> withConn $ \c -> go (Just c) [] (getGlobalSteps global) where withConn :: (Client.Connection -> IO ()) -> IO () withConn = Client.withConnection connParams testServer @@ -413,12 +411,7 @@ serverLocal clock call = \(LocalSteps steps) -> do Terminate mErr -> do mInp <- liftIO $ try $ within timeoutReceive action $ Server.Binary.recvInput call - -- TODO: - -- - -- On the server side we cannot distinguish regular client - -- termination from an exception when receiving. - let expectation = isExpectedElem $ NoMoreElems NoMetadata - expect (tick, action) expectation mInp + expect (tick, action) isClientDisconnected mInp modify $ ifPeerAlive $ PeerTerminated $ DeliberateException <$> mErr -- Wait for the client disconnect to become visible @@ -428,12 +421,6 @@ serverLocal clock call = \(LocalSteps steps) -> do -- terminate more-or-less immediately, this does not necessarily indicate -- any kind of failure: the client may simply have put the call in -- half-closed mode. - -- - -- TODO: - -- However, when the client terminates early and we are not using one - -- connection per RPC (i.e. we are sharing a connection), the server will - -- /never/ realize that the client has disappeared. See the discussion in - -- the issue above. waitForClientDisconnect :: IO () waitForClientDisconnect = within timeoutFailure () $ loop @@ -457,6 +444,16 @@ serverLocal clock call = \(LocalSteps steps) -> do isExpectedElem _ (Left _) = False isExpectedElem expectedElem (Right streamElem) = expectedElem == streamElem + isClientDisconnected :: + Either Server.ClientDisconnected (StreamElem NoMetadata Int) + -> Bool + isClientDisconnected (Left (Server.ClientDisconnected e _)) + | Just HTTP2.Client.ConnectionIsClosed <- fromException e + = True + | otherwise + = False + isClientDisconnected _ = False + serverGlobal :: HasCallStack => TestClock @@ -495,8 +492,10 @@ serverGlobal clock globalStepsVar call = do Top-level -------------------------------------------------------------------------------} -execGlobalSteps :: GlobalSteps -> IO ClientServerTest -execGlobalSteps steps = do +data ConnUsage = SharedConn | ConnPerRPC + +execGlobalSteps :: ConnUsage -> GlobalSteps -> IO ClientServerTest +execGlobalSteps connUsage steps = do globalStepsVar <- newMVar (order steps) clock <- TestClock.new @@ -513,7 +512,7 @@ execGlobalSteps steps = do expectEarlyClientTermination = clientTerminatesEarly , expectEarlyServerTermination = serverTerminatesEarly } - , client = clientGlobal clock connPerRPC steps + , client = clientGlobal clock connUsage steps , server = [ handler (Proxy @TestRpc1) , handler (Proxy @TestRpc2) @@ -524,9 +523,6 @@ execGlobalSteps steps = do clientTerminatesEarly, serverTerminatesEarly :: Bool (clientTerminatesEarly, serverTerminatesEarly) = hasEarlyTermination steps - connPerRPC :: Bool - connPerRPC = serverTerminatesEarly || clientTerminatesEarly - -- For 'clientGlobal' the order doesn't matter, because it spawns a thread -- for each 'LocalSteps'. The server however doesn't get this option; the -- threads /get/ spawnwed for each incoming connection, and must feel off diff --git a/test-grapesy/Test/Prop/Dialogue.hs b/test-grapesy/Test/Prop/Dialogue.hs index d7bab278..69d5d1c2 100644 --- a/test-grapesy/Test/Prop/Dialogue.hs +++ b/test-grapesy/Test/Prop/Dialogue.hs @@ -15,39 +15,48 @@ import Test.Driver.Dialogue tests :: TestTree tests = testGroup "Test.Prop.Dialogue" [ testGroup "Regression" [ - testCase "trivial1" $ regression trivial1 - , testCase "trivial2" $ regression trivial2 - , testCase "trivial3" $ regression trivial3 - , testCase "concurrent1" $ regression concurrent1 - , testCase "concurrent2" $ regression concurrent2 - , testCase "concurrent3" $ regression concurrent3 - , testCase "concurrent4" $ regression concurrent4 - , testCase "exception1" $ regression exception1 - , testCase "exception2" $ regression exception2 - , testCase "earlyTermination01" $ regression earlyTermination01 - , testCase "earlyTermination02" $ regression earlyTermination02 - , testCase "earlyTermination03" $ regression earlyTermination03 - , testCase "earlyTermination04" $ regression earlyTermination04 - , testCase "earlyTermination05" $ regression earlyTermination05 - , testCase "earlyTermination06" $ regression earlyTermination06 - , testCase "earlyTermination07" $ regression earlyTermination07 - , testCase "earlyTermination08" $ regression earlyTermination08 - , testCase "earlyTermination09" $ regression earlyTermination09 - , testCase "earlyTermination10" $ regression earlyTermination10 - , testCase "earlyTermination11" $ regression earlyTermination11 - , testCase "earlyTermination12" $ regression earlyTermination12 - , testCase "earlyTermination13" $ regression earlyTermination13 - , testCase "earlyTermination14" $ regression earlyTermination14 - , testCase "allowHalfClosed1" $ regression allowHalfClosed1 - , testCase "allowHalfClosed2" $ regression allowHalfClosed2 - , testCase "allowHalfClosed3" $ regression allowHalfClosed3 + testCase "trivial1" $ regression SharedConn trivial1 + , testCase "trivial2" $ regression SharedConn trivial2 + , testCase "trivial3" $ regression SharedConn trivial3 + , testCase "concurrent1" $ regression SharedConn concurrent1 + , testCase "concurrent2" $ regression SharedConn concurrent2 + , testCase "concurrent3" $ regression SharedConn concurrent3 + , testCase "concurrent4" $ regression SharedConn concurrent4 + , testCase "exception1" $ regression ConnPerRPC exception1 + , testCase "exception2" $ regression ConnPerRPC exception2 + , testCase "earlyTermination01" $ regression ConnPerRPC earlyTermination01 + , testCase "earlyTermination02" $ regression ConnPerRPC earlyTermination02 + , testCase "earlyTermination03" $ regression ConnPerRPC earlyTermination03 + , testCase "earlyTermination04" $ regression ConnPerRPC earlyTermination04 + , testCase "earlyTermination05" $ regression ConnPerRPC earlyTermination05 + , testCase "earlyTermination06" $ regression ConnPerRPC earlyTermination06 + , testCase "earlyTermination07" $ regression ConnPerRPC earlyTermination07 + , testCase "earlyTermination08" $ regression ConnPerRPC earlyTermination08 + , testCase "earlyTermination09" $ regression ConnPerRPC earlyTermination09 + , testCase "earlyTermination10" $ regression ConnPerRPC earlyTermination10 + , testCase "earlyTermination11" $ regression ConnPerRPC earlyTermination11 + , testCase "earlyTermination12" $ regression ConnPerRPC earlyTermination12 + , testCase "earlyTermination13" $ regression ConnPerRPC earlyTermination13 + , testCase "earlyTermination14" $ regression ConnPerRPC earlyTermination14 + , testCase "unilateralTermination1" $ regression SharedConn unilateralTermination1 + , testCase "unilateralTermination2" $ regression SharedConn unilateralTermination2 + , testCase "unilateralTermination3" $ regression SharedConn unilateralTermination3 + , testCase "allowHalfClosed1" $ regression SharedConn allowHalfClosed1 + , testCase "allowHalfClosed2" $ regression SharedConn allowHalfClosed2 + , testCase "allowHalfClosed3" $ regression ConnPerRPC allowHalfClosed3 ] , testGroup "Setup" [ testProperty "shrinkingWellFounded" prop_shrinkingWellFounded ] , testGroup "Arbitrary" [ - testProperty "withoutExceptions" arbitraryWithoutExceptions - , testProperty "withExceptions" arbitraryWithExceptions + testGroup "WithoutExceptions" [ + testProperty "connPerRPC" $ arbitraryWithoutExceptions ConnPerRPC + , testProperty "sharedConn" $ arbitraryWithoutExceptions SharedConn + ] + , testGroup "WithExceptions" [ + testProperty "connPerRPC" $ arbitraryWithExceptions ConnPerRPC + , testProperty "sharedConn" $ arbitraryWithExceptions SharedConn + ] ] ] @@ -66,26 +75,26 @@ prop_shrinkingWellFounded = Running the tests -------------------------------------------------------------------------------} -arbitraryWithoutExceptions :: DialogueWithoutExceptions -> Property -arbitraryWithoutExceptions (DialogueWithoutExceptions dialogue) = - propDialogue dialogue +arbitraryWithoutExceptions :: ConnUsage -> DialogueWithoutExceptions -> Property +arbitraryWithoutExceptions connUsage (DialogueWithoutExceptions dialogue) = + propDialogue connUsage dialogue -arbitraryWithExceptions :: DialogueWithExceptions -> Property -arbitraryWithExceptions (DialogueWithExceptions dialogue) = - propDialogue dialogue +arbitraryWithExceptions :: ConnUsage -> DialogueWithExceptions -> Property +arbitraryWithExceptions connUsage (DialogueWithExceptions dialogue) = + propDialogue connUsage dialogue -propDialogue :: Dialogue -> Property -propDialogue dialogue = +propDialogue :: ConnUsage -> Dialogue -> Property +propDialogue connUsage dialogue = counterexample (show globalSteps) $ - propClientServer $ execGlobalSteps globalSteps + propClientServer $ execGlobalSteps connUsage globalSteps where globalSteps :: GlobalSteps globalSteps = dialogueGlobalSteps dialogue -regression :: Dialogue -> IO () -regression dialogue = +regression :: ConnUsage -> Dialogue -> IO () +regression connUsage dialogue = handle (throwIO . RegressionTestFailed globalSteps) $ - testClientServer =<< execGlobalSteps globalSteps + testClientServer =<< execGlobalSteps connUsage globalSteps where globalSteps :: GlobalSteps globalSteps = dialogueGlobalSteps dialogue @@ -359,6 +368,35 @@ earlyTermination14 = NormalizedDialogue [ , (0, ServerAction $ Terminate (Just (SomeServerException 0))) ] +unilateralTermination1 :: Dialogue +unilateralTermination1 = NormalizedDialogue [ + (1,ClientAction (Initiate (def,RPC1))) + , (1,ServerAction (Send (FinalElem 0 def))) + , (0,ClientAction (Initiate (def,RPC1))) + , (0,ClientAction (Send (NoMoreElems NoMetadata))) + , (0,ServerAction (Send (NoMoreElems def))) + ] + +unilateralTermination2 :: Dialogue +unilateralTermination2 = NormalizedDialogue [ + (1,ClientAction (Initiate (def,RPC1))) + , (1,ServerAction (Send (FinalElem 0 def))) + , (0,ClientAction (Initiate (def,RPC1))) + , (0,ClientAction (Send (NoMoreElems NoMetadata))) + , (0,ServerAction (Send (NoMoreElems def))) + , (2,ClientAction (Initiate (def,RPC1))) + , (2,ServerAction (Send (FinalElem 0 def))) + ] + +unilateralTermination3 :: Dialogue +unilateralTermination3 = NormalizedDialogue [ + (0, ClientAction (Initiate (def,RPC1))) + , (0, ServerAction (Send (NoMoreElems def))) + , (1, ClientAction (Initiate (def,RPC1))) + , (1, ClientAction (Send (FinalElem 0 NoMetadata))) + , (1, ServerAction (Send (NoMoreElems def))) + ] + {------------------------------------------------------------------------------- Dealing correctly with 'AllowHalfClosed' diff --git a/test-grapesy/Test/Sanity/Exception.hs b/test-grapesy/Test/Sanity/Exception.hs index 0e97e6a5..b91a2454 100644 --- a/test-grapesy/Test/Sanity/Exception.hs +++ b/test-grapesy/Test/Sanity/Exception.hs @@ -46,7 +46,7 @@ tests = testGroup "Test.Sanity.Exception" [ -- | Client makes many concurrent calls, throws an exception during one of them. test_clientException :: IO () test_clientException = testClientServer $ ClientServerTest { - config = def + config = def { expectEarlyClientTermination = True } , client = simpleTestClient $ \conn -> do -- Make 100 concurrent calls. 99 of them counting to 50, and one -- more that throws an exception once it reaches 10. @@ -129,22 +129,23 @@ test_serverException = do -- does not wait for client termination. test_earlyTerminationNoWait :: IO () test_earlyTerminationNoWait = testClientServer $ ClientServerTest { - config = def + config = def { expectEarlyClientTermination = True } , client = simpleTestClient $ \conn -> do _mResult <- try @DeliberateException $ Client.withRPC conn def (Proxy @Trivial) $ \_call -> throwIO (DeliberateException $ SomeServerException 0) - Client.withRPC conn def (Proxy @Trivial) $ \call -> do + result <- Client.withRPC conn def (Proxy @Trivial) $ \call -> do Binary.sendFinalInput @Word8 call 0 - _output <- Binary.recvOutput @Word8 call - return () + Binary.recvFinalOutput @Word8 call + + assertEqual "" (1, NoMetadata) result , server = [ Server.someRpcHandler $ Server.mkRpcHandler @Trivial $ \call -> Binary.recvInput @Word8 call >>= \case - _ -> Server.sendTrailers call NoMetadata + _ -> Binary.sendFinalOutput @Word8 call (1, NoMetadata) ] } @@ -180,10 +181,5 @@ incUntilFinal call = do FinalElem n _ -> do Binary.sendFinalOutput @Word64 call (succ n, NoMetadata) NoMoreElems _ -> do - -- TODO: - -- - -- We shouldn't need to handle this case, since our client never - -- explicitly sends 'NoMoreElems'. However, see discussion in the - -- ticket above. - Server.sendTrailers call NoMetadata - return () + -- This is never hit, since our client never sends 'NoMoreElems'. + error "Test.Sanity.Exception.incUntilFinal: unexpected NoMoreElems" diff --git a/util/Network/GRPC/Util/Session.hs b/util/Network/GRPC/Util/Session.hs index ccff3e04..869361d0 100644 --- a/util/Network/GRPC/Util/Session.hs +++ b/util/Network/GRPC/Util/Session.hs @@ -44,6 +44,7 @@ module Network.GRPC.Util.Session ( -- ** Construction -- *** Client , ConnectionToServer(..) + , CancelRequest , setupRequestChannel -- *** Server , ConnectionToClient(..) diff --git a/util/Network/GRPC/Util/Session/Channel.hs b/util/Network/GRPC/Util/Session/Channel.hs index ebb87de9..10bd22c3 100644 --- a/util/Network/GRPC/Util/Session/Channel.hs +++ b/util/Network/GRPC/Util/Session/Channel.hs @@ -106,7 +106,7 @@ data Channel sess = Channel { } data RecvFinal flow = - -- | We have not yet delivered the final message to the clinet + -- | We have not yet delivered the final message to the client RecvNotFinal -- | We delivered the final message, but not yet the trailers @@ -394,7 +394,7 @@ data RecvAfterFinal = -- | Wait for the outbound thread to terminate -- -- See 'close' for discussion. -waitForOutbound :: Channel sess -> IO (FlowState (Outbound sess)) +waitForOutbound :: Channel sess -> IO () waitForOutbound Channel{channelOutbound} = atomically $ waitForNormalThreadTermination channelOutbound @@ -416,7 +416,7 @@ waitForOutbound Channel{channelOutbound} = atomically $ -- 2. Proper procedure for outbound messages was not followed (see above) -- -- In the case of (2) this is bug in the caller, and so 'close' will return an --- exception. In the case of (1), howvever, very likely an exception will +-- exception. In the case of (1), however, very likely an exception will -- /already/ have been thrown when a communication attempt was made, and 'close' -- will return 'Nothing'. This matches the design philosophy in @grapesy@ that -- exceptions are thrown \"lazily\" rather than \"strictly\". @@ -434,7 +434,7 @@ close Channel{channelOutbound} reason = do AlreadyTerminated _ -> return $ Nothing AlreadyAborted _err -> - -- Connection_ to the peer was lost prior to closing + -- Connection to the peer was lost prior to closing return $ Nothing Cancelled -> -- Proper procedure for outbound messages was not followed @@ -449,7 +449,7 @@ close Channel{channelOutbound} reason = do -- | Channel was closed because it was discarded -- --- This typically corresponds to leaving the scope of 'acceptCall' or +-- This typically corresponds to leaving the scope of 'runHandler' or -- 'withRPC' (without throwing an exception). data ChannelDiscarded = ChannelDiscarded CallStack deriving stock (Show) @@ -544,7 +544,6 @@ sendMessageLoop sess st stream = do loop :: IO (Trailers (Outbound sess)) loop = do msg <- atomically $ takeTMVar (flowMsg st) - case msg of StreamElem x -> do writeChunk stream $ build x @@ -554,6 +553,13 @@ sendMessageLoop sess st stream = do writeChunkFinal stream $ build x return trailers NoMoreElems trailers -> do + -- It is crucial to still 'writeChunkFinal' here to guarantee that + -- cancellation is a no-op. Without it, cancellation may result in a + -- @RST_STREAM@ frame may being sent to the peer. + -- + -- This does not necessarily write a DATA frame, since http2 avoids + -- writing empty data frames unless they are marked @END_OF_STREAM@. + writeChunkFinal stream $ mempty return trailers -- | Receive all messages sent by the node's peer @@ -618,9 +624,9 @@ outboundTrailersMaker sess Channel{channelOutbound} regular = go go :: HTTP2.TrailersMaker go (Just _) = return $ HTTP2.NextTrailersMaker go go Nothing = do - mFlowState <- atomically $ do - (Right <$> readTMVar (flowTerminated regular)) - `orElse` (Left <$> waitForAbnormalThreadTermination channelOutbound) + mFlowState <- atomically $ + unlessAbnormallyTerminated channelOutbound $ + readTMVar (flowTerminated regular) case mFlowState of Right trailers -> return $ HTTP2.Trailers $ buildOutboundTrailers sess trailers diff --git a/util/Network/GRPC/Util/Session/Client.hs b/util/Network/GRPC/Util/Session/Client.hs index 5ea1df51..bfd74553 100644 --- a/util/Network/GRPC/Util/Session/Client.hs +++ b/util/Network/GRPC/Util/Session/Client.hs @@ -2,10 +2,13 @@ module Network.GRPC.Util.Session.Client ( ConnectionToServer(..) , NoTrailers(..) + , CancelRequest , setupRequestChannel ) where +import Control.Concurrent import Control.Concurrent.STM +import Control.Monad import Control.Monad.Catch import Data.ByteString qualified as BS.Strict import Data.ByteString qualified as Strict (ByteString) @@ -83,6 +86,8 @@ class NoTrailers sess where -- | There is no interesting information in the trailers noTrailers :: Proxy sess -> Trailers (Outbound sess) +type CancelRequest = Maybe SomeException -> IO () + -- | Setup request channel -- -- This initiates a new request. @@ -95,7 +100,7 @@ setupRequestChannel :: forall sess. -- ^ We assume that when the server closes their outbound connection to us, -- the entire conversation is over (i.e., the server cannot "half-close"). -> FlowStart (Outbound sess) - -> IO (Channel sess) + -> IO (Channel sess, CancelRequest) setupRequestChannel sess ConnectionToServer{sendRequest} terminateCall @@ -104,6 +109,10 @@ setupRequestChannel sess channel <- initChannel let requestInfo = buildRequestInfo sess outboundStart + cancelRequestVar <- newEmptyMVar + let cancelRequest :: CancelRequest + cancelRequest e = join . (fmap ($ e)) $ readMVar cancelRequestVar + case outboundStart of FlowStartRegular headers -> do regular <- initFlowStateRegular headers @@ -112,7 +121,7 @@ setupRequestChannel sess (requestMethod requestInfo) (requestPath requestInfo) (requestHeaders requestInfo) - $ outboundThread channel regular + $ outboundThread channel cancelRequestVar regular forkRequest channel req FlowStartNoMessages trailers -> do let state :: FlowState (Outbound sess) @@ -123,6 +132,8 @@ setupRequestChannel sess (requestMethod requestInfo) (requestPath requestInfo) (requestHeaders requestInfo) + -- Can't cancel non-streaming request + putMVar cancelRequestVar $ \_ -> return () atomically $ modifyTVar (channelOutbound channel) $ \oldState -> case oldState of @@ -132,7 +143,7 @@ setupRequestChannel sess error "setupRequestChannel: expected thread state" forkRequest channel req - return channel + return (channel, cancelRequest) where _ = addConstraint @(NoTrailers sess) @@ -174,12 +185,14 @@ setupRequestChannel sess outboundThread :: Channel sess + -> MVar CancelRequest -> RegularFlowState (Outbound sess) -> Client.OutBodyIface -> IO () - outboundThread channel regular iface = + outboundThread channel cancelRequestVar regular iface = threadBody "grapesy:clientOutbound" (channelOutbound channel) $ \markReady _debugId -> do markReady $ FlowStateRegular regular + putMVar cancelRequestVar (Client.outBodyCancel iface) stream <- clientOutputStream iface -- Unlike the client inbound thread, or the inbound/outbound threads -- of the server, http2 knows about this particular thread and may diff --git a/util/Network/GRPC/Util/Thread.hs b/util/Network/GRPC/Util/Thread.hs index adafaee2..f8f1bc77 100644 --- a/util/Network/GRPC/Util/Thread.hs +++ b/util/Network/GRPC/Util/Thread.hs @@ -14,17 +14,19 @@ module Network.GRPC.Util.Thread ( -- * Access thread state , CancelResult(..) , cancelThread + , ThreadState_(..) + , getThreadState_ + , unlessAbnormallyTerminated , withThreadInterface , waitForNormalThreadTermination - , waitForAbnormalThreadTermination , waitForNormalOrAbnormalThreadTermination - , hasThreadTerminated ) where import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad +import Data.Void (Void, absurd) import Foreign (newStablePtr, freeStablePtr) import GHC.Stack import System.IO.Unsafe (unsafePerformIO) @@ -306,38 +308,62 @@ withThreadInterface state k = -- | Wait for the thread to terminate normally -- -- If the thread terminated with an exception, this rethrows that exception. -waitForNormalThreadTermination :: TVar (ThreadState a) -> STM a +waitForNormalThreadTermination :: TVar (ThreadState a) -> STM () waitForNormalThreadTermination state = - waitForNormalOrAbnormalThreadTermination state >>= either throwSTM return + waitUntilInitialized state >>= \case + ThreadNotYetRunning_ v -> absurd v + ThreadRunning_ -> retry + ThreadDone_ -> return () + ThreadException_ e -> throwSTM e -- | Wait for the thread to terminate normally or abnormally waitForNormalOrAbnormalThreadTermination :: TVar (ThreadState a) - -> STM (Either SomeException a) + -> STM (Maybe SomeException) waitForNormalOrAbnormalThreadTermination state = - hasThreadTerminated state >>= maybe retry return - --- | Wait for the thread to terminate abnormally -waitForAbnormalThreadTermination :: + waitUntilInitialized state >>= \case + ThreadNotYetRunning_ v -> absurd v + ThreadRunning_ -> retry + ThreadDone_ -> return $ Nothing + ThreadException_ e -> return $ Just e + +-- | Run the specified transaction, unless the thread terminated with an +-- exception +unlessAbnormallyTerminated :: + TVar (ThreadState a) + -> STM b + -> STM (Either SomeException b) +unlessAbnormallyTerminated state f = + waitUntilInitialized state >>= \case + ThreadNotYetRunning_ v -> absurd v + ThreadRunning_ -> Right <$> f + ThreadDone_ -> Right <$> f + ThreadException_ e -> return $ Left e + +waitUntilInitialized :: TVar (ThreadState a) - -> STM SomeException -waitForAbnormalThreadTermination state = - hasThreadTerminated state >>= \case - Just (Left e) -> return e - _otherwise -> retry - --- | Has the thread terminated? -hasThreadTerminated :: + -> STM (ThreadState_ Void) +waitUntilInitialized state = getThreadState_ state retry + +-- | An abstraction of 'ThreadState' without the public interface type. +data ThreadState_ notRunning = + ThreadNotYetRunning_ notRunning + | ThreadRunning_ + | ThreadDone_ + | ThreadException_ SomeException + +getThreadState_ :: TVar (ThreadState a) - -> STM (Maybe (Either SomeException a)) -hasThreadTerminated state = do + -> STM notRunning + -> STM (ThreadState_ notRunning) +getThreadState_ state onNotRunning = do st <- readTVar state case st of - ThreadNotStarted _ -> retry - ThreadInitializing _ _ -> retry - ThreadRunning _ _ _ -> return $ Nothing - ThreadDone _ a -> return $ Just (Right a) - ThreadException _ e -> return $ Just (Left e) + ThreadNotStarted _ -> ThreadNotYetRunning_ <$> onNotRunning + ThreadInitializing _ _ -> ThreadNotYetRunning_ <$> onNotRunning + ThreadRunning _ _ _ -> return $ ThreadRunning_ + ThreadDone _ _ -> return $ ThreadDone_ + ThreadException _ e -> return $ ThreadException_ e {------------------------------------------------------------------------------- Internal auxiliary