diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs index 424aa054932..39a4c5932ce 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs @@ -42,6 +42,7 @@ import Control.Concurrent.Class.MonadSTM.Strict import Control.Exception (SomeAsyncException (..)) import Control.Monad (foldM) import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadFork import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI @@ -154,8 +155,11 @@ with } k = do + labelThisThread "inbound-governor" var <- newTVarIO (mkPublicState emptyState) - withAsync (inboundGovernorLoop var emptyState + withAsync ((do + labelThisThread "inbound-governor-loop" + inboundGovernorLoop var emptyState) `catch` handleError var) $ \thread -> diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs index f8d65d40842..662421f7429 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs @@ -38,7 +38,6 @@ import Control.Monad.Class.MonadTimer.SI import Control.Tracer (Tracer, contramap, traceWith) import Data.ByteString.Lazy (ByteString) -import Data.List as List (foldl') import Data.List.NonEmpty (NonEmpty) import Data.List.NonEmpty qualified as NonEmpty import Data.Void (Void, absurd) @@ -166,18 +165,24 @@ with Arguments { InboundGovernor.idleTimeout = inboundIdleTimeout, InboundGovernor.connectionManager = connectionManager } $ \inboundGovernorThread readPublicInboundState -> - withAsync (k inboundGovernorThread readPublicInboundState) $ \actionThread -> do + withAsync (do + labelThisThread "Server2 (ouroboros-network-framework)" + k inboundGovernorThread readPublicInboundState) $ \actionThread -> do let acceptLoops :: [m Void] acceptLoops = - [ (accept snocket socket >>= acceptLoop localAddress) + [ (do + labelThisThread ("accept " ++ show localAddress) + accept snocket socket >>= acceptLoop localAddress) `finally` close snocket socket | (localAddress, socket) <- localAddresses `zip` sockets ] -- race all `acceptLoops` with `actionThread` and -- `inboundGovernorThread` - List.foldl' (\as io -> fn <$> as `race` io) - (fn <$> actionThread `waitEither` inboundGovernorThread) - acceptLoops + let waiter = fn <$> (do + labelThisThread "racing-action-inbound-governor" + actionThread `waitEither` inboundGovernorThread) + + (fn <$> waiter `race` (labelThisThread "racing-accept-loops" >> raceAll acceptLoops)) `finally` traceWith tracer TrServerStopped `catch` @@ -191,6 +196,13 @@ with Arguments { fn (Left x) = x fn (Right v) = absurd v + raceAll asyncs = withAsyncAll asyncs (fmap snd . waitAny) + + withAsyncAll xs0 action = go [] xs0 + where + go as [] = action (reverse as) + go as (x:xs) = withAsync x (\a -> go (a:as) xs) + acceptLoop :: peerAddr -> Accept m socket peerAddr -> m Void @@ -295,4 +307,3 @@ data Trace peerAddr -- ^ similar to 'TrAcceptConnection' but it is logged once the connection is -- handed to inbound connection manager, e.g. after handshake negotiation. deriving Show - diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index f9243857d2e..58880144bc9 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -45,14 +45,14 @@ import Control.Monad.Class.MonadTimer.SI import Control.Monad.Fix (MonadFix) import Control.Tracer (Tracer, contramap, nullTracer, traceWith) import Data.ByteString.Lazy (ByteString) -import Data.Foldable (asum) +import Data.Function ((&)) import Data.Hashable (Hashable) import Data.IP (IP) import Data.IP qualified as IP import Data.List.NonEmpty (NonEmpty (..)) import Data.Map (Map) import Data.Map qualified as Map -import Data.Maybe (catMaybes, maybeToList) +import Data.Maybe (catMaybes) import Data.Proxy (Proxy (..)) import Data.Typeable (Typeable) import Data.Void (Void) @@ -703,12 +703,13 @@ runM Interfaces -- Thread to which 'RethrowPolicy' will throw fatal exceptions. mainThreadId <- myThreadId - Async.runConcurrently - $ asum - $ Async.Concurrently <$> - ( mkRemoteThread mainThreadId - : maybeToList (mkLocalThread mainThreadId <$> daLocalAddress) - ) + -- If we have a local address, race the remote and local threads. Otherwise + -- just launch the remote thread. + mkRemoteThread mainThreadId & + (case daLocalAddress of + Nothing -> id + Just addr -> (fmap (either id id) . (`Async.race` mkLocalThread mainThreadId addr)) + ) where (ledgerPeersRng, rng1) = split diRng @@ -753,8 +754,9 @@ runM Interfaces -- | mkLocalThread - create local connection manager mkLocalThread :: ThreadId m -> Either ntcFd ntcAddr -> m Void - mkLocalThread mainThreadId localAddr = - withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr + mkLocalThread mainThreadId localAddr = do + labelThisThread "local connection manager" + withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr $ \localSocket -> do localInbInfoChannel <- newInformationChannel @@ -832,6 +834,7 @@ runM Interfaces mkRemoteThread :: ThreadId m -> m Void mkRemoteThread mainThreadId = do + labelThisThread "remote connection manager" let exitPolicy :: ExitPolicy a exitPolicy = stdExitPolicy daReturnPolicy @@ -1177,11 +1180,15 @@ runM Interfaces PeerSelectionActionsDiffusionMode { psPeerStateActions = peerStateActions } $ \(ledgerPeersThread, localRootPeersProvider) peerSelectionActions -> Async.withAsync - (peerSelectionGovernor' dtDebugPeerSelectionInitiatorResponderTracer debugStateVar peerSelectionActions) $ \governorThread -> do + (do + labelThisThread "Peer selection governor" + peerSelectionGovernor' dtDebugPeerSelectionInitiatorResponderTracer debugStateVar peerSelectionActions) $ \governorThread -> do -- begin, unique to InitiatorAndResponder mode: traceWith tracer (RunServer addresses) -- end, unique to ... - Async.withAsync peerChurnGovernor' $ \churnGovernorThread -> + Async.withAsync (do + labelThisThread "Peer churn governor" + peerChurnGovernor') $ \churnGovernorThread -> -- wait for any thread to fail: snd <$> Async.waitAny [ledgerPeersThread, localRootPeersProvider, governorThread, churnGovernorThread, inboundGovernorThread] diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/LedgerPeers.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/LedgerPeers.hs index 883ba1d0be5..919b780e2b7 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/LedgerPeers.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/LedgerPeers.hs @@ -42,6 +42,7 @@ module Ouroboros.Network.PeerSelection.LedgerPeers import Control.Monad (when) import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadFork import Control.Monad.Class.MonadTime.SI import Control.Tracer (Tracer, traceWith) import Data.IP qualified as IP @@ -452,5 +453,7 @@ withLedgerPeers peerActionsDNS atomically $ putTMVar reqVar (numberOfPeers, ledgerPeersKind) atomically $ takeTMVar respVar withAsync - (ledgerPeersThread peerActionsDNS ledgerPeerArgs getRequest putResponse) + (do + labelThisThread "ledger-peers" + ledgerPeersThread peerActionsDNS ledgerPeerArgs getRequest putResponse) $ \ thread -> k request thread diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerSelectionActions.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerSelectionActions.hs index 7aa5b6e4131..650d4effcf7 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerSelectionActions.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerSelectionActions.hs @@ -21,6 +21,7 @@ import Control.Applicative (Alternative) import Control.Concurrent.Class.MonadMVar (MonadMVar (..)) import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadFork import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI @@ -146,7 +147,9 @@ withPeerSelectionActions updateOutboundConnectionsState, readLedgerPeerSnapshot } withAsync - (localRootPeersProvider + (do + labelThisThread "local-roots-peers" + localRootPeersProvider localTracer toPeerAddr -- NOTE: we don't set `resolvConcurrent` because diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/RootPeersDNS/DNSActions.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/RootPeersDNS/DNSActions.hs index 3c5cb881b98..28b67aa7aa1 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/RootPeersDNS/DNSActions.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/RootPeersDNS/DNSActions.hs @@ -24,6 +24,7 @@ import Data.List.NonEmpty (NonEmpty (..)) import Control.Exception (IOException) import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadFork import Control.Monad.Class.Trans () import Control.Concurrent.Class.MonadSTM.Strict @@ -323,6 +324,7 @@ ioDNSActions = -> DNS.Domain -> IO (Either DNS.DNSError [(IP, DNS.TTL)]) lookupAWithTTL resolvConf resolver domain = do + labelThisThread "lookupAWithTTL" reply <- timeout (microsecondsAsIntToDiffTime $ DNS.resolvTimeout resolvConf) (DNS.lookupRaw resolver domain DNS.A) @@ -346,6 +348,7 @@ ioDNSActions = -> DNS.Domain -> IO (Either DNS.DNSError [(IP, DNS.TTL)]) lookupAAAAWithTTL resolvConf resolver domain = do + labelThisThread "lookupAAAAWithTTL" reply <- timeout (microsecondsAsIntToDiffTime $ DNS.resolvTimeout resolvConf) (DNS.lookupRaw resolver domain DNS.AAAA) diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/RootPeersDNS/PublicRootPeers.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/RootPeersDNS/PublicRootPeers.hs index 5179a2624b2..38e2b44c558 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/RootPeersDNS/PublicRootPeers.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/RootPeersDNS/PublicRootPeers.hs @@ -17,6 +17,7 @@ import Data.Word (Word32) import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad (when) import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadFork import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Tracer (Tracer (..), traceWith) @@ -100,11 +101,14 @@ publicRootPeersProvider tracer Right resolver -> do let lookups = [ ((DomainAccessPoint domain port, pa),) - <$> withDNSSemaphore dnsSemaphore + <$> (do + labelThisThread "dnsLookupWithTTL" + withDNSSemaphore dnsSemaphore (dnsLookupWithTTL resolvConf resolver domain) + ) | (RelayAccessDomain domain port, pa) <- Map.assocs domains ] -- The timeouts here are handled by the 'lookupWithTTL'. They're -- configured via the DNS.ResolvConf resolvTimeout field and defaults