Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diffusion improvements #5070

Merged
merged 7 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ import Ouroboros.Cardano.Network.PeerSelection.Governor.PeerSelectionState quali
import Ouroboros.Cardano.Network.PublicRootPeers qualified as Cardano
import Ouroboros.Cardano.Network.PublicRootPeers qualified as ExtraPeers
import Ouroboros.Network.Block (BlockNo (..))
import Ouroboros.Network.Diffusion.Common (isFatal)
import Ouroboros.Network.Diffusion.P2P (isFatal)
import Test.Ouroboros.Network.ConnectionManager.Timeouts
import Test.Ouroboros.Network.ConnectionManager.Utils
import Test.Ouroboros.Network.Diffusion.Testnet.Node (config_REPROMOTE_DELAY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ import Ouroboros.Cardano.PeerSelection.PeerSelectionActions
(requestPublicRootPeers)
import Ouroboros.Network.BlockFetch (FetchMode (..), PraosFetchMode (..),
TraceFetchClientState, TraceLabelPeer (..))
import Ouroboros.Network.Diffusion.Common qualified as Diff.P2P
import Ouroboros.Network.Diffusion.P2P qualified as Diff.P2P
import Ouroboros.Network.PeerSelection.PeerAdvertise (PeerAdvertise (..))
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)
import Ouroboros.Network.PeerSelection.RelayAccessPoint (DomainAccessPoint (..),
Expand Down Expand Up @@ -1229,7 +1229,7 @@ diffusionSimulation
, Churn.consensusMode = consensusMode
}

arguments :: Node.Arguments (Cardano.ExtraArguments m) (Churn.ExtraArguments m) PeerTrustable m
arguments :: Node.Arguments (Churn.ExtraArguments m) PeerTrustable m
arguments =
Node.Arguments
{ Node.aIPAddress = addr
Expand All @@ -1250,7 +1250,6 @@ diffusionSimulation
, Node.aDNSLookupDelayScript = dnsLookupDelay
, Node.aDebugTracer = (\s -> WithTime (Time (-1)) (WithName addr (DiffusionDebugTrace s)))
`contramap` nodeTracer
, Node.aExtraArgs = cardanoExtraArgs
, Node.aExtraChurnArgs = cardanoChurnArgs
}

Expand All @@ -1267,16 +1266,10 @@ diffusionSimulation
interfaces
arguments
(ExtraState.empty consensusMode (NumberOfBigLedgerPeers 0))
(Cardano.cardanoExtraArgsToPeerSelectionActions cardanoExtraArgs)
ExtraSizes.empty
Cardano.cardanoPublicRootPeersAPI
(Cardano.cardanoPeerSelectionGovernorArgs
readUseLedgerPeers
peerSharing
( Cardano.updateOutboundConnectionsState
$ lpExtraAPI
$ Node.iLedgerPeersConsensusInterface
$ interfaces)
(Cardano.cardanoExtraArgsToPeerSelectionActions cardanoExtraArgs)
)
Cardano.cardanoPeerSelectionStatetoCounters
(flip Cardano.ExtraPeers Set.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (ChainSelStarvationEndedAt))
import Ouroboros.Network.ConnectionManager.State (ConnStateIdSupply)
import Ouroboros.Network.Diffusion.Common qualified as Common
import Ouroboros.Network.Diffusion.P2P (runM)
import Ouroboros.Network.Diffusion.P2P qualified as P2P
import Ouroboros.Network.PeerSelection.Churn (PeerChurnArgs)
import Ouroboros.Network.PeerSelection.Governor.Types
(PeerSelectionGovernorArgs)
Expand Down Expand Up @@ -140,7 +140,7 @@ data Interfaces extraAPI m = Interfaces
type NtNFD m = FD m NtNAddr
type NtCFD m = FD m NtCAddr

data Arguments extraArgs extraChurnArgs extraFlags m = Arguments
data Arguments extraChurnArgs extraFlags m = Arguments
{ aIPAddress :: NtNAddr
, aAcceptedLimits :: AcceptedConnectionsLimit
, aDiffusionMode :: DiffusionMode
Expand All @@ -161,7 +161,6 @@ data Arguments extraArgs extraChurnArgs extraFlags m = Arguments
, aDNSTimeoutScript :: Script DNSTimeout
, aDNSLookupDelayScript :: Script DNSLookupDelay
, aDebugTracer :: Tracer m String
, aExtraArgs :: extraArgs
, aExtraChurnArgs :: extraChurnArgs
}

Expand All @@ -170,7 +169,7 @@ data Arguments extraArgs extraChurnArgs extraFlags m = Arguments
--
type ResolverException = SomeException

run :: forall extraArgs extraState extraDebugState extraActions extraAPI
run :: forall extraState extraDebugState extraAPI
extraPeers extraFlags extraChurnArgs extraCounters
exception resolver resolverError m.
( Alternative (STM m)
Expand Down Expand Up @@ -201,16 +200,14 @@ run :: forall extraArgs extraState extraDebugState extraActions extraAPI
=> Node.BlockGeneratorArgs Block StdGen
-> Node.LimitsAndTimeouts BlockHeader Block
-> Interfaces extraAPI m
-> Arguments extraArgs extraChurnArgs extraFlags m
-> Arguments extraChurnArgs extraFlags m
-> extraState
-> extraActions
-> extraCounters
-> PublicExtraPeersAPI extraPeers NtNAddr
-> (forall muxMode responderCtx ntnVersionData bytes a b .
PeerSelectionGovernorArgs
extraState
extraDebugState
extraActions
extraFlags
extraPeers
extraAPI
Expand Down Expand Up @@ -247,14 +244,14 @@ run :: forall extraArgs extraState extraDebugState extraActions extraAPI
extraCounters
NtNAddr
-> m Void)
-> Common.TracersExtra NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
ResolverException extraState extraDebugState extraFlags
extraPeers extraCounters m
-> P2P.TracersExtra NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
ResolverException extraState extraDebugState extraFlags
extraPeers extraCounters m
-> Tracer m (TraceLabelPeer NtNAddr (TraceFetchClientState BlockHeader))
-> m Void
run blockGeneratorArgs limits ni na
emptyExtraState extraActions emptyExtraCounters
emptyExtraState emptyExtraCounters
extraPeersAPI psArgs psToExtraCounters
toExtraPeers requestPublicRootPeers peerChurnGovernor
tracersExtra tracerBlockFetch =
Expand All @@ -265,16 +262,16 @@ run blockGeneratorArgs limits ni na
peerMetrics <- newPeerMetric PeerMetricsConfiguration { maxEntriesToTrack = 180 }

let -- diffusion interfaces
interfaces :: Common.Interfaces (NtNFD m) NtNAddr NtNVersion NtNVersionData
interfaces :: P2P.Interfaces (NtNFD m) NtNAddr NtNVersion NtNVersionData
(NtCFD m) NtCAddr NtCVersion NtCVersionData
resolver ResolverException extraState extraFlags extraPeers extraAPI m
interfaces = Common.Interfaces
{ Common.diNtnSnocket = iNtnSnocket ni
, Common.diNtnBearer = iNtnBearer ni
, Common.diNtnConfigureSocket = \_ _ -> return ()
, Common.diNtnConfigureSystemdSocket
interfaces = P2P.Interfaces
{ P2P.diNtnSnocket = iNtnSnocket ni
, P2P.diNtnBearer = iNtnBearer ni
, P2P.diNtnConfigureSocket = \_ _ -> return ()
, P2P.diNtnConfigureSystemdSocket
= \_ _ -> return ()
, Common.diNtnHandshakeArguments =
, P2P.diNtnHandshakeArguments =
HandshakeArguments
{ haHandshakeTracer = nullTracer
, haHandshakeCodec = unversionedHandshakeCodec
Expand All @@ -283,16 +280,16 @@ run blockGeneratorArgs limits ni na
, haQueryVersion = const False
, haTimeLimits = timeLimitsHandshake
}
, Common.diNtnAddressType = ntnAddressType
, Common.diNtnDataFlow = \NtNVersionData { ntnDiffusionMode } ->
, P2P.diNtnAddressType = ntnAddressType
, P2P.diNtnDataFlow = \NtNVersionData { ntnDiffusionMode } ->
case ntnDiffusionMode of
InitiatorOnlyDiffusionMode -> Unidirectional
InitiatorAndResponderDiffusionMode -> Duplex
, Common.diNtnPeerSharing = ntnPeerSharing
, Common.diNtnToPeerAddr = \a b -> TestAddress (Node.IPAddr a b)
, Common.diNtcSnocket = iNtcSnocket ni
, Common.diNtcBearer = iNtcBearer ni
, Common.diNtcHandshakeArguments =
, P2P.diNtnPeerSharing = ntnPeerSharing
, P2P.diNtnToPeerAddr = \a b -> TestAddress (Node.IPAddr a b)
, P2P.diNtcSnocket = iNtcSnocket ni
, P2P.diNtcBearer = iNtcBearer ni
, P2P.diNtcHandshakeArguments =
HandshakeArguments
{ haHandshakeTracer = nullTracer
, haHandshakeCodec = unversionedHandshakeCodec
Expand All @@ -301,44 +298,44 @@ run blockGeneratorArgs limits ni na
, haQueryVersion = const False
, haTimeLimits = noTimeLimitsHandshake
}
, Common.diNtcGetFileDescriptor = \_ -> pure invalidFileDescriptor
, Common.diRng = diffStgGen
, Common.diInstallSigUSR1Handler = \_ _ _ -> pure ()
, Common.diDnsActions = const (mockDNSActions
, P2P.diNtcGetFileDescriptor = \_ -> pure invalidFileDescriptor
, P2P.diRng = diffStgGen
, P2P.diInstallSigUSR1Handler = \_ _ _ -> pure ()
, P2P.diDnsActions = const (mockDNSActions
(iDomainMap ni)
dnsTimeoutScriptVar
dnsLookupDelayScriptVar)
, Common.diUpdateVersionData = \versionData diffusionMode ->
, P2P.diUpdateVersionData = \versionData diffusionMode ->
versionData { ntnDiffusionMode = diffusionMode }
, Common.diConnStateIdSupply = iConnStateIdSupply ni
, P2P.diConnStateIdSupply = iConnStateIdSupply ni
}

appsExtra :: Common.ApplicationsExtra NtNAddr m ()
appsExtra = Common.ApplicationsExtra
appsExtra :: P2P.ApplicationsExtra NtNAddr m ()
appsExtra = P2P.ApplicationsExtra
{ -- TODO: simulation errors should be critical
Common.daRethrowPolicy =
P2P.daRethrowPolicy =
muxErrorRethrowPolicy
<> ioErrorRethrowPolicy

-- we are not using local connections, so we can make all the
-- errors fatal.
, Common.daLocalRethrowPolicy =
, P2P.daLocalRethrowPolicy =
mkRethrowPolicy
(\ _ (_ :: SomeException) -> ShutdownNode)
, Common.daPeerMetrics = peerMetrics
, P2P.daPeerMetrics = peerMetrics
-- fetch mode is not used (no block-fetch mini-protocol)
, Common.daReturnPolicy = \_ -> config_REPROMOTE_DELAY
, Common.daPeerSharingRegistry = nkPeerSharingRegistry nodeKernel
, P2P.daReturnPolicy = \_ -> config_REPROMOTE_DELAY
, P2P.daPeerSharingRegistry = nkPeerSharingRegistry nodeKernel
}

let apps = Node.applications (aDebugTracer na) nodeKernel Node.cborCodecs limits appArgs blockHeader

withAsync
(runM interfaces
Common.nullTracers
tracersExtra
(mkArgs (nkPublicPeerSelectionVar nodeKernel))
argsExtra apps appsExtra)
(P2P.runM interfaces
Common.nullTracers
tracersExtra
(mkArgs (nkPublicPeerSelectionVar nodeKernel))
argsExtra apps appsExtra)
$ \ diffusionThread ->
withAsync (blockFetch nodeKernel) $ \blockFetchLogicThread ->
wait diffusionThread
Expand Down Expand Up @@ -456,33 +453,31 @@ run blockGeneratorArgs limits ni na
, Common.daPublicPeerSelectionVar
}

argsExtra :: Common.ArgumentsExtra
extraArgs extraState extraDebugState extraActions
argsExtra :: P2P.ArgumentsExtra
extraState extraDebugState
extraFlags extraPeers extraAPI
extraChurnArgs extraCounters exception
NtNAddr resolver resolverError m
argsExtra = Common.ArgumentsExtra
{ Common.daPeerSelectionTargets = aPeerTargets na
, Common.daReadLocalRootPeers = aReadLocalRootPeers na
, Common.daReadPublicRootPeers = aReadPublicRootPeers na
, Common.daOwnPeerSharing = aOwnPeerSharing na
, Common.daReadUseLedgerPeers = aReadUseLedgerPeers na
, Common.daProtocolIdleTimeout = aProtocolIdleTimeout na
, Common.daTimeWaitTimeout = aTimeWaitTimeout na
, Common.daDeadlineChurnInterval = 3300
, Common.daBulkChurnInterval = 300
, Common.daReadLedgerPeerSnapshot = pure Nothing -- ^ tested independently
, Common.daEmptyExtraState = emptyExtraState
, Common.daEmptyExtraCounters = emptyExtraCounters
, Common.daExtraPeersAPI = extraPeersAPI
, Common.daExtraActions = extraActions
, Common.daExtraChurnArgs = aExtraChurnArgs na
, Common.daExtraArgs = aExtraArgs na
, Common.daToExtraPeers = toExtraPeers
, Common.daRequestPublicRootPeers = Just requestPublicRootPeers
, Common.daPeerChurnGovernor = peerChurnGovernor
, Common.daPeerSelectionGovernorArgs = psArgs
, Common.daPeerSelectionStateToExtraCounters = psToExtraCounters
argsExtra = P2P.ArgumentsExtra
{ P2P.daPeerSelectionTargets = aPeerTargets na
, P2P.daReadLocalRootPeers = aReadLocalRootPeers na
, P2P.daReadPublicRootPeers = aReadPublicRootPeers na
, P2P.daOwnPeerSharing = aOwnPeerSharing na
, P2P.daReadUseLedgerPeers = aReadUseLedgerPeers na
, P2P.daProtocolIdleTimeout = aProtocolIdleTimeout na
, P2P.daTimeWaitTimeout = aTimeWaitTimeout na
, P2P.daDeadlineChurnInterval = 3300
, P2P.daBulkChurnInterval = 300
, P2P.daReadLedgerPeerSnapshot = pure Nothing -- ^ tested independently
, P2P.daEmptyExtraState = emptyExtraState
, P2P.daEmptyExtraCounters = emptyExtraCounters
, P2P.daExtraPeersAPI = extraPeersAPI
, P2P.daExtraChurnArgs = aExtraChurnArgs na
, P2P.daToExtraPeers = toExtraPeers
, P2P.daRequestPublicRootPeers = Just requestPublicRootPeers
, P2P.daPeerChurnGovernor = peerChurnGovernor
, P2P.daPeerSelectionGovernorArgs = psArgs
, P2P.daPeerSelectionStateToExtraCounters = psToExtraCounters
}

appArgs :: Node.AppArgs extraAPI BlockHeader Block m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4011,9 +4011,10 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap

peerSelectionGovernorArgs =
Cardano.cardanoPeerSelectionGovernorArgs
(return DontUseLedgerPeers)
peerSharing
(Cardano.updateOutboundConnectionsState (lpExtraAPI (getLedgerStateCtx actions)))
Cardano.ExtraPeerSelectionActions {
genesisPeerTargets = targets,
readUseBootstrapPeers = readUseBootstrapPeers
}


publicRootPeersProvider
Expand Down Expand Up @@ -4045,7 +4046,6 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
actions
:: PeerSelectionActions
Cardano.ExtraState
(Cardano.ExtraPeerSelectionActions IO)
PeerTrustable
(Cardano.ExtraPeers SockAddr)
(Cardano.LedgerPeersConsensusInterface IO)
Expand Down Expand Up @@ -4083,10 +4083,6 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
},
peerSelectionTargets = targets,
readLedgerPeerSnapshot = pure Nothing,
extraActions = Cardano.ExtraPeerSelectionActions {
genesisPeerTargets = targets,
readUseBootstrapPeers = readUseBootstrapPeers
},
extraStateToExtraCounters = ExtraSizes.cardanoPeerSelectionStatetoCounters,
extraPeersAPI = ExtraPeers.cardanoPublicRootPeersAPI
}
Expand Down
Loading