Skip to content

Commit

Permalink
Merge pull request #5050 from IntersectMBO/js/labels
Browse files Browse the repository at this point in the history
Label some threads
  • Loading branch information
jasagredo authored Feb 5, 2025
2 parents aaf95ea + 52fb7e4 commit a933425
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (SomeAsyncException (..))
import Control.Monad (foldM)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
Expand Down Expand Up @@ -154,8 +155,11 @@ with
}
k
= do
labelThisThread "inbound-governor"
var <- newTVarIO (mkPublicState emptyState)
withAsync (inboundGovernorLoop var emptyState
withAsync ((do
labelThisThread "inbound-governor-loop"
inboundGovernorLoop var emptyState)
`catch`
handleError var) $
\thread ->
Expand Down
25 changes: 18 additions & 7 deletions ouroboros-network-framework/src/Ouroboros/Network/Server2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, contramap, traceWith)

import Data.ByteString.Lazy (ByteString)
import Data.List as List (foldl')
import Data.List.NonEmpty (NonEmpty)
import Data.List.NonEmpty qualified as NonEmpty
import Data.Void (Void, absurd)
Expand Down Expand Up @@ -166,18 +165,24 @@ with Arguments {
InboundGovernor.idleTimeout = inboundIdleTimeout,
InboundGovernor.connectionManager = connectionManager
} $ \inboundGovernorThread readPublicInboundState ->
withAsync (k inboundGovernorThread readPublicInboundState) $ \actionThread -> do
withAsync (do
labelThisThread "Server2 (ouroboros-network-framework)"
k inboundGovernorThread readPublicInboundState) $ \actionThread -> do
let acceptLoops :: [m Void]
acceptLoops =
[ (accept snocket socket >>= acceptLoop localAddress)
[ (do
labelThisThread ("accept " ++ show localAddress)
accept snocket socket >>= acceptLoop localAddress)
`finally` close snocket socket
| (localAddress, socket) <- localAddresses `zip` sockets
]
-- race all `acceptLoops` with `actionThread` and
-- `inboundGovernorThread`
List.foldl' (\as io -> fn <$> as `race` io)
(fn <$> actionThread `waitEither` inboundGovernorThread)
acceptLoops
let waiter = fn <$> (do
labelThisThread "racing-action-inbound-governor"
actionThread `waitEither` inboundGovernorThread)

(fn <$> waiter `race` (labelThisThread "racing-accept-loops" >> raceAll acceptLoops))
`finally`
traceWith tracer TrServerStopped
`catch`
Expand All @@ -191,6 +196,13 @@ with Arguments {
fn (Left x) = x
fn (Right v) = absurd v

raceAll asyncs = withAsyncAll asyncs (fmap snd . waitAny)

withAsyncAll xs0 action = go [] xs0
where
go as [] = action (reverse as)
go as (x:xs) = withAsync x (\a -> go (a:as) xs)

acceptLoop :: peerAddr
-> Accept m socket peerAddr
-> m Void
Expand Down Expand Up @@ -295,4 +307,3 @@ data Trace peerAddr
-- ^ similar to 'TrAcceptConnection' but it is logged once the connection is
-- handed to inbound connection manager, e.g. after handshake negotiation.
deriving Show

31 changes: 19 additions & 12 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ import Control.Monad.Class.MonadTimer.SI
import Control.Monad.Fix (MonadFix)
import Control.Tracer (Tracer, contramap, nullTracer, traceWith)
import Data.ByteString.Lazy (ByteString)
import Data.Foldable (asum)
import Data.Function ((&))
import Data.Hashable (Hashable)
import Data.IP (IP)
import Data.IP qualified as IP
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Maybe (catMaybes, maybeToList)
import Data.Maybe (catMaybes)
import Data.Proxy (Proxy (..))
import Data.Typeable (Typeable)
import Data.Void (Void)
Expand Down Expand Up @@ -703,12 +703,13 @@ runM Interfaces
-- Thread to which 'RethrowPolicy' will throw fatal exceptions.
mainThreadId <- myThreadId

Async.runConcurrently
$ asum
$ Async.Concurrently <$>
( mkRemoteThread mainThreadId
: maybeToList (mkLocalThread mainThreadId <$> daLocalAddress)
)
-- If we have a local address, race the remote and local threads. Otherwise
-- just launch the remote thread.
mkRemoteThread mainThreadId &
(case daLocalAddress of
Nothing -> id
Just addr -> (fmap (either id id) . (`Async.race` mkLocalThread mainThreadId addr))
)

where
(ledgerPeersRng, rng1) = split diRng
Expand Down Expand Up @@ -753,8 +754,9 @@ runM Interfaces
-- | mkLocalThread - create local connection manager

mkLocalThread :: ThreadId m -> Either ntcFd ntcAddr -> m Void
mkLocalThread mainThreadId localAddr =
withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr
mkLocalThread mainThreadId localAddr = do
labelThisThread "local connection manager"
withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr
$ \localSocket -> do
localInbInfoChannel <- newInformationChannel

Expand Down Expand Up @@ -832,6 +834,7 @@ runM Interfaces

mkRemoteThread :: ThreadId m -> m Void
mkRemoteThread mainThreadId = do
labelThisThread "remote connection manager"
let
exitPolicy :: ExitPolicy a
exitPolicy = stdExitPolicy daReturnPolicy
Expand Down Expand Up @@ -1177,11 +1180,15 @@ runM Interfaces
PeerSelectionActionsDiffusionMode { psPeerStateActions = peerStateActions } $
\(ledgerPeersThread, localRootPeersProvider) peerSelectionActions ->
Async.withAsync
(peerSelectionGovernor' dtDebugPeerSelectionInitiatorResponderTracer debugStateVar peerSelectionActions) $ \governorThread -> do
(do
labelThisThread "Peer selection governor"
peerSelectionGovernor' dtDebugPeerSelectionInitiatorResponderTracer debugStateVar peerSelectionActions) $ \governorThread -> do
-- begin, unique to InitiatorAndResponder mode:
traceWith tracer (RunServer addresses)
-- end, unique to ...
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
Async.withAsync (do
labelThisThread "Peer churn governor"
peerChurnGovernor') $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny [ledgerPeersThread, localRootPeersProvider, governorThread, churnGovernorThread, inboundGovernorThread]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ module Ouroboros.Network.PeerSelection.LedgerPeers

import Control.Monad (when)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadTime.SI
import Control.Tracer (Tracer, traceWith)
import Data.IP qualified as IP
Expand Down Expand Up @@ -452,5 +453,7 @@ withLedgerPeers peerActionsDNS
atomically $ putTMVar reqVar (numberOfPeers, ledgerPeersKind)
atomically $ takeTMVar respVar
withAsync
(ledgerPeersThread peerActionsDNS ledgerPeerArgs getRequest putResponse)
(do
labelThisThread "ledger-peers"
ledgerPeersThread peerActionsDNS ledgerPeerArgs getRequest putResponse)
$ \ thread -> k request thread
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar (MonadMVar (..))
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
Expand Down Expand Up @@ -146,7 +147,9 @@ withPeerSelectionActions
updateOutboundConnectionsState,
readLedgerPeerSnapshot }
withAsync
(localRootPeersProvider
(do
labelThisThread "local-roots-peers"
localRootPeersProvider
localTracer
toPeerAddr
-- NOTE: we don't set `resolvConcurrent` because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Data.List.NonEmpty (NonEmpty (..))

import Control.Exception (IOException)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.Trans ()

import Control.Concurrent.Class.MonadSTM.Strict
Expand Down Expand Up @@ -323,6 +324,7 @@ ioDNSActions =
-> DNS.Domain
-> IO (Either DNS.DNSError [(IP, DNS.TTL)])
lookupAWithTTL resolvConf resolver domain = do
labelThisThread "lookupAWithTTL"
reply <- timeout (microsecondsAsIntToDiffTime
$ DNS.resolvTimeout resolvConf)
(DNS.lookupRaw resolver domain DNS.A)
Expand All @@ -346,6 +348,7 @@ ioDNSActions =
-> DNS.Domain
-> IO (Either DNS.DNSError [(IP, DNS.TTL)])
lookupAAAAWithTTL resolvConf resolver domain = do
labelThisThread "lookupAAAAWithTTL"
reply <- timeout (microsecondsAsIntToDiffTime
$ DNS.resolvTimeout resolvConf)
(DNS.lookupRaw resolver domain DNS.AAAA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Data.Word (Word32)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad (when)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Tracer (Tracer (..), traceWith)
Expand Down Expand Up @@ -100,11 +101,14 @@ publicRootPeersProvider tracer
Right resolver -> do
let lookups =
[ ((DomainAccessPoint domain port, pa),)
<$> withDNSSemaphore dnsSemaphore
<$> (do
labelThisThread "dnsLookupWithTTL"
withDNSSemaphore dnsSemaphore
(dnsLookupWithTTL
resolvConf
resolver
domain)
)
| (RelayAccessDomain domain port, pa) <- Map.assocs domains ]
-- The timeouts here are handled by the 'lookupWithTTL'. They're
-- configured via the DNS.ResolvConf resolvTimeout field and defaults
Expand Down

0 comments on commit a933425

Please sign in to comment.