Skip to content

Commit

Permalink
Merge pull request #5070 from IntersectMBO/coot/bolt12/reusable-diffu…
Browse files Browse the repository at this point in the history
…sion-3

Diffusion improvements
  • Loading branch information
coot authored Feb 11, 2025
2 parents 7e02959 + d700faa commit 1e6f14a
Show file tree
Hide file tree
Showing 21 changed files with 765 additions and 864 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ import Ouroboros.Cardano.Network.PeerSelection.Governor.PeerSelectionState quali
import Ouroboros.Cardano.Network.PublicRootPeers qualified as Cardano
import Ouroboros.Cardano.Network.PublicRootPeers qualified as ExtraPeers
import Ouroboros.Network.Block (BlockNo (..))
import Ouroboros.Network.Diffusion.Common (isFatal)
import Ouroboros.Network.Diffusion.P2P (isFatal)
import Test.Ouroboros.Network.ConnectionManager.Timeouts
import Test.Ouroboros.Network.ConnectionManager.Utils
import Test.Ouroboros.Network.Diffusion.Testnet.Node (config_REPROMOTE_DELAY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ import Ouroboros.Cardano.PeerSelection.PeerSelectionActions
(requestPublicRootPeers)
import Ouroboros.Network.BlockFetch (FetchMode (..), PraosFetchMode (..),
TraceFetchClientState, TraceLabelPeer (..))
import Ouroboros.Network.Diffusion.Common qualified as Diff.P2P
import Ouroboros.Network.Diffusion.P2P qualified as Diff.P2P
import Ouroboros.Network.PeerSelection.PeerAdvertise (PeerAdvertise (..))
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)
import Ouroboros.Network.PeerSelection.RelayAccessPoint (DomainAccessPoint (..),
Expand Down Expand Up @@ -1229,7 +1229,7 @@ diffusionSimulation
, Churn.consensusMode = consensusMode
}

arguments :: Node.Arguments (Cardano.ExtraArguments m) (Churn.ExtraArguments m) PeerTrustable m
arguments :: Node.Arguments (Churn.ExtraArguments m) PeerTrustable m
arguments =
Node.Arguments
{ Node.aIPAddress = addr
Expand All @@ -1250,7 +1250,6 @@ diffusionSimulation
, Node.aDNSLookupDelayScript = dnsLookupDelay
, Node.aDebugTracer = (\s -> WithTime (Time (-1)) (WithName addr (DiffusionDebugTrace s)))
`contramap` nodeTracer
, Node.aExtraArgs = cardanoExtraArgs
, Node.aExtraChurnArgs = cardanoChurnArgs
}

Expand All @@ -1267,16 +1266,10 @@ diffusionSimulation
interfaces
arguments
(ExtraState.empty consensusMode (NumberOfBigLedgerPeers 0))
(Cardano.cardanoExtraArgsToPeerSelectionActions cardanoExtraArgs)
ExtraSizes.empty
Cardano.cardanoPublicRootPeersAPI
(Cardano.cardanoPeerSelectionGovernorArgs
readUseLedgerPeers
peerSharing
( Cardano.updateOutboundConnectionsState
$ lpExtraAPI
$ Node.iLedgerPeersConsensusInterface
$ interfaces)
(Cardano.cardanoExtraArgsToPeerSelectionActions cardanoExtraArgs)
)
Cardano.cardanoPeerSelectionStatetoCounters
(flip Cardano.ExtraPeers Set.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (ChainSelStarvationEndedAt))
import Ouroboros.Network.ConnectionManager.State (ConnStateIdSupply)
import Ouroboros.Network.Diffusion.Common qualified as Common
import Ouroboros.Network.Diffusion.P2P (runM)
import Ouroboros.Network.Diffusion.P2P qualified as P2P
import Ouroboros.Network.PeerSelection.Churn (PeerChurnArgs)
import Ouroboros.Network.PeerSelection.Governor.Types
(PeerSelectionGovernorArgs)
Expand Down Expand Up @@ -140,7 +140,7 @@ data Interfaces extraAPI m = Interfaces
type NtNFD m = FD m NtNAddr
type NtCFD m = FD m NtCAddr

data Arguments extraArgs extraChurnArgs extraFlags m = Arguments
data Arguments extraChurnArgs extraFlags m = Arguments
{ aIPAddress :: NtNAddr
, aAcceptedLimits :: AcceptedConnectionsLimit
, aDiffusionMode :: DiffusionMode
Expand All @@ -161,7 +161,6 @@ data Arguments extraArgs extraChurnArgs extraFlags m = Arguments
, aDNSTimeoutScript :: Script DNSTimeout
, aDNSLookupDelayScript :: Script DNSLookupDelay
, aDebugTracer :: Tracer m String
, aExtraArgs :: extraArgs
, aExtraChurnArgs :: extraChurnArgs
}

Expand All @@ -170,7 +169,7 @@ data Arguments extraArgs extraChurnArgs extraFlags m = Arguments
--
type ResolverException = SomeException

run :: forall extraArgs extraState extraDebugState extraActions extraAPI
run :: forall extraState extraDebugState extraAPI
extraPeers extraFlags extraChurnArgs extraCounters
exception resolver resolverError m.
( Alternative (STM m)
Expand Down Expand Up @@ -201,16 +200,14 @@ run :: forall extraArgs extraState extraDebugState extraActions extraAPI
=> Node.BlockGeneratorArgs Block StdGen
-> Node.LimitsAndTimeouts BlockHeader Block
-> Interfaces extraAPI m
-> Arguments extraArgs extraChurnArgs extraFlags m
-> Arguments extraChurnArgs extraFlags m
-> extraState
-> extraActions
-> extraCounters
-> PublicExtraPeersAPI extraPeers NtNAddr
-> (forall muxMode responderCtx ntnVersionData bytes a b .
PeerSelectionGovernorArgs
extraState
extraDebugState
extraActions
extraFlags
extraPeers
extraAPI
Expand Down Expand Up @@ -247,14 +244,14 @@ run :: forall extraArgs extraState extraDebugState extraActions extraAPI
extraCounters
NtNAddr
-> m Void)
-> Common.TracersExtra NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
ResolverException extraState extraDebugState extraFlags
extraPeers extraCounters m
-> P2P.TracersExtra NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
ResolverException extraState extraDebugState extraFlags
extraPeers extraCounters m
-> Tracer m (TraceLabelPeer NtNAddr (TraceFetchClientState BlockHeader))
-> m Void
run blockGeneratorArgs limits ni na
emptyExtraState extraActions emptyExtraCounters
emptyExtraState emptyExtraCounters
extraPeersAPI psArgs psToExtraCounters
toExtraPeers requestPublicRootPeers peerChurnGovernor
tracersExtra tracerBlockFetch =
Expand All @@ -265,16 +262,16 @@ run blockGeneratorArgs limits ni na
peerMetrics <- newPeerMetric PeerMetricsConfiguration { maxEntriesToTrack = 180 }

let -- diffusion interfaces
interfaces :: Common.Interfaces (NtNFD m) NtNAddr NtNVersion NtNVersionData
interfaces :: P2P.Interfaces (NtNFD m) NtNAddr NtNVersion NtNVersionData
(NtCFD m) NtCAddr NtCVersion NtCVersionData
resolver ResolverException extraState extraFlags extraPeers extraAPI m
interfaces = Common.Interfaces
{ Common.diNtnSnocket = iNtnSnocket ni
, Common.diNtnBearer = iNtnBearer ni
, Common.diNtnConfigureSocket = \_ _ -> return ()
, Common.diNtnConfigureSystemdSocket
interfaces = P2P.Interfaces
{ P2P.diNtnSnocket = iNtnSnocket ni
, P2P.diNtnBearer = iNtnBearer ni
, P2P.diNtnConfigureSocket = \_ _ -> return ()
, P2P.diNtnConfigureSystemdSocket
= \_ _ -> return ()
, Common.diNtnHandshakeArguments =
, P2P.diNtnHandshakeArguments =
HandshakeArguments
{ haHandshakeTracer = nullTracer
, haHandshakeCodec = unversionedHandshakeCodec
Expand All @@ -283,16 +280,16 @@ run blockGeneratorArgs limits ni na
, haQueryVersion = const False
, haTimeLimits = timeLimitsHandshake
}
, Common.diNtnAddressType = ntnAddressType
, Common.diNtnDataFlow = \NtNVersionData { ntnDiffusionMode } ->
, P2P.diNtnAddressType = ntnAddressType
, P2P.diNtnDataFlow = \NtNVersionData { ntnDiffusionMode } ->
case ntnDiffusionMode of
InitiatorOnlyDiffusionMode -> Unidirectional
InitiatorAndResponderDiffusionMode -> Duplex
, Common.diNtnPeerSharing = ntnPeerSharing
, Common.diNtnToPeerAddr = \a b -> TestAddress (Node.IPAddr a b)
, Common.diNtcSnocket = iNtcSnocket ni
, Common.diNtcBearer = iNtcBearer ni
, Common.diNtcHandshakeArguments =
, P2P.diNtnPeerSharing = ntnPeerSharing
, P2P.diNtnToPeerAddr = \a b -> TestAddress (Node.IPAddr a b)
, P2P.diNtcSnocket = iNtcSnocket ni
, P2P.diNtcBearer = iNtcBearer ni
, P2P.diNtcHandshakeArguments =
HandshakeArguments
{ haHandshakeTracer = nullTracer
, haHandshakeCodec = unversionedHandshakeCodec
Expand All @@ -301,44 +298,44 @@ run blockGeneratorArgs limits ni na
, haQueryVersion = const False
, haTimeLimits = noTimeLimitsHandshake
}
, Common.diNtcGetFileDescriptor = \_ -> pure invalidFileDescriptor
, Common.diRng = diffStgGen
, Common.diInstallSigUSR1Handler = \_ _ _ -> pure ()
, Common.diDnsActions = const (mockDNSActions
, P2P.diNtcGetFileDescriptor = \_ -> pure invalidFileDescriptor
, P2P.diRng = diffStgGen
, P2P.diInstallSigUSR1Handler = \_ _ _ -> pure ()
, P2P.diDnsActions = const (mockDNSActions
(iDomainMap ni)
dnsTimeoutScriptVar
dnsLookupDelayScriptVar)
, Common.diUpdateVersionData = \versionData diffusionMode ->
, P2P.diUpdateVersionData = \versionData diffusionMode ->
versionData { ntnDiffusionMode = diffusionMode }
, Common.diConnStateIdSupply = iConnStateIdSupply ni
, P2P.diConnStateIdSupply = iConnStateIdSupply ni
}

appsExtra :: Common.ApplicationsExtra NtNAddr m ()
appsExtra = Common.ApplicationsExtra
appsExtra :: P2P.ApplicationsExtra NtNAddr m ()
appsExtra = P2P.ApplicationsExtra
{ -- TODO: simulation errors should be critical
Common.daRethrowPolicy =
P2P.daRethrowPolicy =
muxErrorRethrowPolicy
<> ioErrorRethrowPolicy

-- we are not using local connections, so we can make all the
-- errors fatal.
, Common.daLocalRethrowPolicy =
, P2P.daLocalRethrowPolicy =
mkRethrowPolicy
(\ _ (_ :: SomeException) -> ShutdownNode)
, Common.daPeerMetrics = peerMetrics
, P2P.daPeerMetrics = peerMetrics
-- fetch mode is not used (no block-fetch mini-protocol)
, Common.daReturnPolicy = \_ -> config_REPROMOTE_DELAY
, Common.daPeerSharingRegistry = nkPeerSharingRegistry nodeKernel
, P2P.daReturnPolicy = \_ -> config_REPROMOTE_DELAY
, P2P.daPeerSharingRegistry = nkPeerSharingRegistry nodeKernel
}

let apps = Node.applications (aDebugTracer na) nodeKernel Node.cborCodecs limits appArgs blockHeader

withAsync
(runM interfaces
Common.nullTracers
tracersExtra
(mkArgs (nkPublicPeerSelectionVar nodeKernel))
argsExtra apps appsExtra)
(P2P.runM interfaces
Common.nullTracers
tracersExtra
(mkArgs (nkPublicPeerSelectionVar nodeKernel))
argsExtra apps appsExtra)
$ \ diffusionThread ->
withAsync (blockFetch nodeKernel) $ \blockFetchLogicThread ->
wait diffusionThread
Expand Down Expand Up @@ -456,33 +453,31 @@ run blockGeneratorArgs limits ni na
, Common.daPublicPeerSelectionVar
}

argsExtra :: Common.ArgumentsExtra
extraArgs extraState extraDebugState extraActions
argsExtra :: P2P.ArgumentsExtra
extraState extraDebugState
extraFlags extraPeers extraAPI
extraChurnArgs extraCounters exception
NtNAddr resolver resolverError m
argsExtra = Common.ArgumentsExtra
{ Common.daPeerSelectionTargets = aPeerTargets na
, Common.daReadLocalRootPeers = aReadLocalRootPeers na
, Common.daReadPublicRootPeers = aReadPublicRootPeers na
, Common.daOwnPeerSharing = aOwnPeerSharing na
, Common.daReadUseLedgerPeers = aReadUseLedgerPeers na
, Common.daProtocolIdleTimeout = aProtocolIdleTimeout na
, Common.daTimeWaitTimeout = aTimeWaitTimeout na
, Common.daDeadlineChurnInterval = 3300
, Common.daBulkChurnInterval = 300
, Common.daReadLedgerPeerSnapshot = pure Nothing -- ^ tested independently
, Common.daEmptyExtraState = emptyExtraState
, Common.daEmptyExtraCounters = emptyExtraCounters
, Common.daExtraPeersAPI = extraPeersAPI
, Common.daExtraActions = extraActions
, Common.daExtraChurnArgs = aExtraChurnArgs na
, Common.daExtraArgs = aExtraArgs na
, Common.daToExtraPeers = toExtraPeers
, Common.daRequestPublicRootPeers = Just requestPublicRootPeers
, Common.daPeerChurnGovernor = peerChurnGovernor
, Common.daPeerSelectionGovernorArgs = psArgs
, Common.daPeerSelectionStateToExtraCounters = psToExtraCounters
argsExtra = P2P.ArgumentsExtra
{ P2P.daPeerSelectionTargets = aPeerTargets na
, P2P.daReadLocalRootPeers = aReadLocalRootPeers na
, P2P.daReadPublicRootPeers = aReadPublicRootPeers na
, P2P.daOwnPeerSharing = aOwnPeerSharing na
, P2P.daReadUseLedgerPeers = aReadUseLedgerPeers na
, P2P.daProtocolIdleTimeout = aProtocolIdleTimeout na
, P2P.daTimeWaitTimeout = aTimeWaitTimeout na
, P2P.daDeadlineChurnInterval = 3300
, P2P.daBulkChurnInterval = 300
, P2P.daReadLedgerPeerSnapshot = pure Nothing -- ^ tested independently
, P2P.daEmptyExtraState = emptyExtraState
, P2P.daEmptyExtraCounters = emptyExtraCounters
, P2P.daExtraPeersAPI = extraPeersAPI
, P2P.daExtraChurnArgs = aExtraChurnArgs na
, P2P.daToExtraPeers = toExtraPeers
, P2P.daRequestPublicRootPeers = Just requestPublicRootPeers
, P2P.daPeerChurnGovernor = peerChurnGovernor
, P2P.daPeerSelectionGovernorArgs = psArgs
, P2P.daPeerSelectionStateToExtraCounters = psToExtraCounters
}

appArgs :: Node.AppArgs extraAPI BlockHeader Block m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4011,9 +4011,10 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap

peerSelectionGovernorArgs =
Cardano.cardanoPeerSelectionGovernorArgs
(return DontUseLedgerPeers)
peerSharing
(Cardano.updateOutboundConnectionsState (lpExtraAPI (getLedgerStateCtx actions)))
Cardano.ExtraPeerSelectionActions {
genesisPeerTargets = targets,
readUseBootstrapPeers = readUseBootstrapPeers
}


publicRootPeersProvider
Expand Down Expand Up @@ -4045,7 +4046,6 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
actions
:: PeerSelectionActions
Cardano.ExtraState
(Cardano.ExtraPeerSelectionActions IO)
PeerTrustable
(Cardano.ExtraPeers SockAddr)
(Cardano.LedgerPeersConsensusInterface IO)
Expand Down Expand Up @@ -4083,10 +4083,6 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
},
peerSelectionTargets = targets,
readLedgerPeerSnapshot = pure Nothing,
extraActions = Cardano.ExtraPeerSelectionActions {
genesisPeerTargets = targets,
readUseBootstrapPeers = readUseBootstrapPeers
},
extraStateToExtraCounters = ExtraSizes.cardanoPeerSelectionStatetoCounters,
extraPeersAPI = ExtraPeers.cardanoPublicRootPeersAPI
}
Expand Down
Loading

0 comments on commit 1e6f14a

Please sign in to comment.