Skip to content

Commit

Permalink
SRV implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
crocodile-dentist committed Dec 3, 2024
1 parent 7e8909f commit 3fce719
Show file tree
Hide file tree
Showing 13 changed files with 425 additions and 230 deletions.
2 changes: 1 addition & 1 deletion ouroboros-network/ouroboros-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ library
Ouroboros.Network.DeltaQ
Ouroboros.Network.Diffusion
Ouroboros.Network.Diffusion.Configuration
Ouroboros.Network.Diffusion.NonP2P
-- Ouroboros.Network.Diffusion.NonP2P
Ouroboros.Network.Diffusion.P2P
Ouroboros.Network.Diffusion.Policies
Ouroboros.Network.ExitPolicy
Expand Down
32 changes: 16 additions & 16 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import Ouroboros.Network.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData,
import Ouroboros.Network.PeerSelection.Governor.Types

import Ouroboros.Network.Diffusion.Common as Common
import Ouroboros.Network.Diffusion.NonP2P qualified as NonP2P
-- import Ouroboros.Network.Diffusion.NonP2P qualified as NonP2P
import Ouroboros.Network.Diffusion.P2P qualified as P2P

-- | Promoted data types.
Expand All @@ -52,9 +52,9 @@ data ExtraTracers (p2p :: P2P) where
IOException IO
-> ExtraTracers 'P2P

NonP2PTracers
:: NonP2P.TracersExtra
-> ExtraTracers 'NonP2P
-- NonP2PTracers
-- :: NonP2P.TracersExtra
-- -> ExtraTracers 'NonP2P


-- | Diffusion arguments which depend on p2p mode.
Expand All @@ -64,9 +64,9 @@ data ExtraArguments (p2p :: P2P) m where
:: P2P.ArgumentsExtra m
-> ExtraArguments 'P2P m

NonP2PArguments
:: NonP2P.ArgumentsExtra
-> ExtraArguments 'NonP2P m
-- NonP2PArguments
-- :: NonP2P.ArgumentsExtra
-- -> ExtraArguments 'NonP2P m


-- | Application data which depend on p2p mode.
Expand All @@ -76,9 +76,9 @@ data ExtraApplications (p2p :: P2P) ntnAddr m a where
:: P2P.ApplicationsExtra ntnAddr m a
-> ExtraApplications 'P2P ntnAddr m a

NonP2PApplications
:: NonP2P.ApplicationsExtra
-> ExtraApplications 'NonP2P ntnAddr m a
-- NonP2PApplications
-- :: NonP2P.ApplicationsExtra
-- -> ExtraApplications 'NonP2P ntnAddr m a


-- | Run data diffusion in either 'P2P' or 'NonP2P' mode.
Expand Down Expand Up @@ -107,9 +107,9 @@ run tracers (P2PTracers tracersExtra)
P2P.run tracers tracersExtra
args argsExtra
apps appsExtra
run tracers (NonP2PTracers tracersExtra)
args (NonP2PArguments argsExtra)
apps (NonP2PApplications appsExtra) =
NonP2P.run tracers tracersExtra
args argsExtra
apps appsExtra
-- run tracers (NonP2PTracers tracersExtra)
-- args (NonP2PArguments argsExtra)
-- apps (NonP2PApplications appsExtra) =
-- NonP2P.run tracers tracersExtra
-- args argsExtra
-- apps appsExtra
16 changes: 8 additions & 8 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1300,25 +1300,25 @@ run tracers tracersExtra args argsExtra apps appsExtra = do
diRng <- newStdGen
runM
Interfaces {
diNtnSnocket = Snocket.socketSnocket iocp,
diNtnSnocket = undefined, --Snocket.socketSnocket iocp,
diNtnBearer = makeSocketBearer,
diNtnConfigureSocket = configureSocket,
diNtnConfigureSystemdSocket =
configureSystemdSocket
(SystemdSocketConfiguration `contramap` tracer),
diNtnConfigureSocket = undefined, --configureSocket,
diNtnConfigureSystemdSocket = undefined,
-- configureSystemdSocket
-- (SystemdSocketConfiguration `contramap` tracer),
diNtnHandshakeArguments,
diNtnAddressType = socketAddressType,
diNtnAddressType = undefined, --socketAddressType,
diNtnDataFlow = ntnDataFlow,
diNtnPeerSharing = peerSharing,
diNtnToPeerAddr = curry IP.toSockAddr,
diNtnToPeerAddr = undefined, -- \ip port resolver -> IP.toSockAddr (ip,port),

diNtcSnocket = Snocket.localSnocket iocp,
diNtcBearer = makeLocalBearer,
diNtcHandshakeArguments,
diNtcGetFileDescriptor = localSocketFileDescriptor,

diRng,
diInstallSigUSR1Handler,
diInstallSigUSR1Handler = undefined,
diDnsActions = ioDNSActions
}
tracers tracersExtra args argsExtra apps appsExtra
Expand Down
2 changes: 1 addition & 1 deletion ouroboros-network/src/Ouroboros/Network/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ localNetworkErrorPolicy = ErrorPolicies {
epConErrorPolicies = []
}

type RemoteAddress = Socket.SockAddr
type RemoteAddress = (Socket.SockAddr, Void)

instance ShowProxy RemoteAddress where
showProxy _ = "SockAddr"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
Expand All @@ -16,7 +17,8 @@ import GHC.Stack (HasCallStack)

import Control.Applicative (Alternative)
import Control.Concurrent.JobPool (Job (..))
import Control.Exception (SomeException)
import Control.Exception (SomeException (..), SomeAsyncException (..))
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadTime.SI
import System.Random (randomR)
Expand Down Expand Up @@ -59,6 +61,7 @@ import Ouroboros.Network.PeerSelection.State.LocalRootPeers qualified as LocalRo
belowTarget :: forall peeraddr peerconn m.
( Alternative (STM m)
, MonadSTM m
, MonadCatch m
, Ord peeraddr
)
=> PeerSelectionActions peeraddr peerconn m
Expand All @@ -70,7 +73,7 @@ belowTarget = belowTargetBigLedgerPeers <> belowTargetLocal <> belowTargetOther
-- configuration.
--
belowTargetLocal :: forall peeraddr peerconn m.
(MonadSTM m, Ord peeraddr, HasCallStack)
(MonadSTM m, MonadCatch m, Ord peeraddr, HasCallStack)
=> PeerSelectionActions peeraddr peerconn m
-> MkGuardedDecision peeraddr peerconn m
belowTargetLocal actions
Expand Down Expand Up @@ -164,7 +167,7 @@ belowTargetLocal actions


belowTargetOther :: forall peeraddr peerconn m.
(MonadSTM m, Ord peeraddr, HasCallStack)
(MonadSTM m, MonadCatch m, Ord peeraddr, HasCallStack)
=> PeerSelectionActions peeraddr peerconn m
-> MkGuardedDecision peeraddr peerconn m
belowTargetOther actions
Expand Down Expand Up @@ -246,7 +249,7 @@ belowTargetOther actions
-- state) then this monitoring action will be disabled.
--
belowTargetBigLedgerPeers :: forall peeraddr peerconn m.
(MonadSTM m, Ord peeraddr, HasCallStack)
(MonadSTM m, MonadCatch m, Ord peeraddr, HasCallStack)
=> PeerSelectionActions peeraddr peerconn m
-> MkGuardedDecision peeraddr peerconn m
belowTargetBigLedgerPeers actions
Expand Down Expand Up @@ -338,19 +341,20 @@ maxColdPeerRetryBackoff = 5


jobPromoteColdPeer :: forall peeraddr peerconn m.
(Monad m, Ord peeraddr)
(MonadCatch m, Ord peeraddr)
=> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionPolicy peeraddr m
-> peeraddr
-> IsBigLedgerPeer
-> Job () m (Completion m peeraddr peerconn)
jobPromoteColdPeer PeerSelectionActions {
peerStateActions = PeerStateActions {establishPeerConnection},
peerConnToPeerSharing
peerConnToPeerSharing,
srvPeer
}
PeerSelectionPolicy { policyPeerShareActivationDelay }
peeraddr isBigLedgerPeer =
Job job handler () "promoteColdPeer"
Job (job peeraddr) handler () "promoteColdPeer"
where
handler :: SomeException -> m (Completion m peeraddr peerconn)
handler e = return $
Expand Down Expand Up @@ -404,72 +408,86 @@ jobPromoteColdPeer PeerSelectionActions {
decisionJobs = []
}

job :: m (Completion m peeraddr peerconn)
job = do
f e = case e of
err | Just (SomeException _) <- fromException e -> Just e
err | Just (SomeAsyncException {}) <- fromException e -> Just e

job :: peeraddr -> m (Completion m peeraddr peerconn)
job peeraddr' = do
--TODO: decide if we should do timeouts here or if we should make that
-- the responsibility of establishPeerConnection
peerconn <- establishPeerConnection isBigLedgerPeer peeraddr
let !peerSharing = peerConnToPeerSharing peerconn

return $ Completion $ \st@PeerSelectionState {
publicRootPeers,
establishedPeers,
knownPeers,
targets = PeerSelectionTargets {
targetNumberOfEstablishedPeers,
targetNumberOfEstablishedBigLedgerPeers
}
}
now ->
let psTime = case peerSharing of
PeerSharingEnabled -> Just (addTime policyPeerShareActivationDelay now)
PeerSharingDisabled -> Nothing
establishedPeers' = EstablishedPeers.insert peeraddr peerconn psTime establishedPeers
advertise = case peerSharing of
PeerSharingEnabled -> DoAdvertisePeer
PeerSharingDisabled -> DoNotAdvertisePeer
-- Update PeerSharing value in KnownPeers
knownPeers' = KnownPeers.alter
(\x -> case x of
Nothing ->
KnownPeers.alterKnownPeerInfo
(Just peerSharing, Just advertise)
x
Just _ ->
KnownPeers.alterKnownPeerInfo
(Just peerSharing, Nothing)
x
)
(Set.singleton peeraddr)
$ KnownPeers.setSuccessfulConnectionFlag (Set.singleton peeraddr)
$ KnownPeers.clearTepidFlag peeraddr $
KnownPeers.resetFailCount
peeraddr
knownPeers
bigLedgerPeersSet = PublicRootPeers.getBigLedgerPeers publicRootPeers

st' = st { establishedPeers = establishedPeers',
inProgressPromoteCold = Set.delete peeraddr
(inProgressPromoteCold st),
knownPeers = knownPeers'
}
cs' = peerSelectionStateToCounters st'

in Decision {
decisionTrace = if peeraddr `Set.member` bigLedgerPeersSet
then [TracePromoteColdBigLedgerPeerDone
targetNumberOfEstablishedBigLedgerPeers
(case cs' of
PeerSelectionCounters { numberOfEstablishedBigLedgerPeers = a } -> a)
peeraddr]
else [TracePromoteColdDone
targetNumberOfEstablishedPeers
(case cs' of
PeerSelectionCounters { numberOfEstablishedPeers = a } -> a)
peeraddr],
decisionState = st',
decisionJobs = []
}
res <- tryJust f do
establishPeerConnection isBigLedgerPeer peeraddr'
case res of
Left e -> do
alternative <- srvPeer peeraddr
case alternative of
Just pa -> job pa -- ^ probably should complete first to remove
-- the previous peeraddr from known and in-progress
-- set and add the new one instead and re-spin the job
Nothing -> throwIO e
Right peerconn -> do
let !peerSharing = peerConnToPeerSharing peerconn

return $ Completion $ \st@PeerSelectionState {
publicRootPeers,
establishedPeers,
knownPeers,
targets = PeerSelectionTargets {
targetNumberOfEstablishedPeers,
targetNumberOfEstablishedBigLedgerPeers
}
}
now ->
let psTime = case peerSharing of
PeerSharingEnabled -> Just (addTime policyPeerShareActivationDelay now)
PeerSharingDisabled -> Nothing
establishedPeers' = EstablishedPeers.insert peeraddr peerconn psTime establishedPeers
advertise = case peerSharing of
PeerSharingEnabled -> DoAdvertisePeer
PeerSharingDisabled -> DoNotAdvertisePeer
-- Update PeerSharing value in KnownPeers
knownPeers' = KnownPeers.alter
(\x -> case x of
Nothing ->
KnownPeers.alterKnownPeerInfo
(Just peerSharing, Just advertise)
x
Just _ ->
KnownPeers.alterKnownPeerInfo
(Just peerSharing, Nothing)
x
)
(Set.singleton peeraddr)
$ KnownPeers.setSuccessfulConnectionFlag (Set.singleton peeraddr)
$ KnownPeers.clearTepidFlag peeraddr $
KnownPeers.resetFailCount
peeraddr
knownPeers
bigLedgerPeersSet = PublicRootPeers.getBigLedgerPeers publicRootPeers

st' = st { establishedPeers = establishedPeers',
inProgressPromoteCold = Set.delete peeraddr
(inProgressPromoteCold st),
knownPeers = knownPeers'
}
cs' = peerSelectionStateToCounters st'

in Decision {
decisionTrace = if peeraddr `Set.member` bigLedgerPeersSet
then [TracePromoteColdBigLedgerPeerDone
targetNumberOfEstablishedBigLedgerPeers
(case cs' of
PeerSelectionCounters { numberOfEstablishedBigLedgerPeers = a } -> a)
peeraddr]
else [TracePromoteColdDone
targetNumberOfEstablishedPeers
(case cs' of
PeerSelectionCounters { numberOfEstablishedPeers = a } -> a)
peeraddr],
decisionState = st',
decisionJobs = []
}


---------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ data PeerSelectionActions peeraddr peerconn m = PeerSelectionActions {

-- | Read the current state of ledger peer snapshot
--
readLedgerPeerSnapshot :: STM m (Maybe LedgerPeerSnapshot)
readLedgerPeerSnapshot :: STM m (Maybe LedgerPeerSnapshot),

srvPeer :: peeraddr -> m (Maybe peeraddr)
}

-- | Interfaces required by the peer selection governor, which do not need to
Expand Down Expand Up @@ -789,7 +791,7 @@ data PeerSelectionView a = PeerSelectionView {

--
-- Non-Root Peers
--
--

viewKnownNonRootPeers :: a,
-- ^ number of known non root peers. These are mostly peers received
Expand Down Expand Up @@ -1770,4 +1772,3 @@ deriving instance (Ord peeraddr, Show peeraddr)

data ChurnMode = ChurnModeBulkSync
| ChurnModeNormal deriving Show

Loading

0 comments on commit 3fce719

Please sign in to comment.