From 7c6fbf093c9d065b631abf69ad8f019c73887ec4 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Sat, 17 Aug 2024 13:11:29 +0200 Subject: [PATCH 1/3] Identify two locations with potentially always growing memory usage --- hydra-node/src/Hydra/API/Server.hs | 1 + hydra-node/src/Hydra/Network/Reliability.hs | 1 + 2 files changed, 2 insertions(+) diff --git a/hydra-node/src/Hydra/API/Server.hs b/hydra-node/src/Hydra/API/Server.hs index 600be258f1d..00df08b288d 100644 --- a/hydra-node/src/Hydra/API/Server.hs +++ b/hydra-node/src/Hydra/API/Server.hs @@ -88,6 +88,7 @@ withAPIServer config party persistence tracer chain pparams callback action = -- NOTE: we need to reverse the list because we store history in a reversed -- list in memory but in order on disk + -- FIXME: always growing history <- newTVarIO (reverse timedOutputEvents) (notifyServerRunning, waitForServerRunning) <- setupServerNotification diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index 42d6c82f439..7fa03810d93 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -227,6 +227,7 @@ withReliability :: NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do acksCache <- loadAcks >>= newTVarIO + -- FIXME: always growing sentMessages <- loadMessages >>= newTVarIO . Seq.fromList resendQ <- newTQueueIO let ourIndex = fromMaybe (error "This cannot happen because we constructed the list with our party inside.") (findPartyIndex me) From a48146bf2ece9c8752deb5f3676ac0b97f2c2a72 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Sat, 17 Aug 2024 13:34:54 +0200 Subject: [PATCH 2/3] Crudely disable caching of outbound messages --- hydra-node/src/Hydra/Network/Reliability.hs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index 7fa03810d93..924b41b76d4 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -86,7 +86,6 @@ import Cardano.Binary (serialize') import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation)) import Control.Concurrent.Class.MonadSTM ( MonadSTM (readTQueue, writeTQueue), - modifyTVar', newTQueueIO, newTVarIO, readTVarIO, @@ -94,7 +93,6 @@ import Control.Concurrent.Class.MonadSTM ( ) import Control.Tracer (Tracer) import Data.IntMap qualified as IMap -import Data.Sequence.Strict ((|>)) import Data.Sequence.Strict qualified as Seq import Data.Vector ( Vector, @@ -225,9 +223,8 @@ withReliability :: -- | Underlying network component providing consuming and sending channels. NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a -> NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a -withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do +withReliability tracer MessagePersistence{saveAcks, loadAcks, loadMessages} me otherParties withRawNetwork callback action = do acksCache <- loadAcks >>= newTVarIO - -- FIXME: always growing sentMessages <- loadMessages >>= newTVarIO . Seq.fromList resendQ <- newTQueueIO let ourIndex = fromMaybe (error "This cannot happen because we constructed the list with our party inside.") (findPartyIndex me) @@ -237,15 +234,19 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa reliableBroadcast sentMessages ourIndex acksCache network where allParties = fromList $ sort $ me : otherParties - reliableBroadcast sentMessages ourIndex acksCache Network{broadcast} = + + reliableBroadcast _sentMessages ourIndex acksCache Network{broadcast} = action $ Network { broadcast = \msg -> case msg of Data{} -> do - localCounter <- atomically $ cacheMessage msg >> incrementAckCounter - saveAcks localCounter - appendMessage msg + -- FIXME: No outbound message cache and persistence, resending will be broken + localCounter <- atomically $ do + -- cacheMessage msg + incrementAckCounter + -- saveAcks localCounter + -- appendMessage msg traceWith tracer BroadcastCounter{ourIndex, localCounter} broadcast $ ReliableMsg localCounter msg Ping{} -> do @@ -261,8 +262,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa writeTVar acksCache newAcks pure newAcks - cacheMessage msg = - modifyTVar' sentMessages (|> msg) + -- cacheMessage msg = + -- modifyTVar' sentMessages (|> msg) reliableCallback acksCache sentMessages resend ourIndex (Authenticated (ReliableMsg acknowledged payload) party) = do if length acknowledged /= length allParties From 54fedc046d047b2ddad21208f2eee227a23b53c5 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Sat, 17 Aug 2024 13:23:16 +0200 Subject: [PATCH 3/3] Crudely disable API server history --- hydra-node/src/Hydra/API/Server.hs | 30 ++++++++++---------- hydra-node/src/Hydra/API/WSServer.hs | 42 +++++++++++++--------------- 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/hydra-node/src/Hydra/API/Server.hs b/hydra-node/src/Hydra/API/Server.hs index 00df08b288d..a5d0bd35a60 100644 --- a/hydra-node/src/Hydra/API/Server.hs +++ b/hydra-node/src/Hydra/API/Server.hs @@ -7,7 +7,7 @@ import Hydra.Prelude hiding (TVar, readTVar, seq) import Cardano.Ledger.Core (PParams) import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan) -import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO) +import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO, readTVar) import Control.Exception (IOException) import Hydra.API.APIServerLog (APIServerLog (..)) import Hydra.API.ClientInput (ClientInput) @@ -21,7 +21,7 @@ import Hydra.API.ServerOutput ( projectInitializingHeadId, projectSnapshotUtxo, ) -import Hydra.API.WSServer (nextSequenceNumber, wsApp) +import Hydra.API.WSServer (wsApp) import Hydra.Cardano.Api (LedgerEra) import Hydra.Chain (Chain (..), IsChainState) import Hydra.Chain.Direct.State () @@ -79,17 +79,19 @@ withAPIServer :: withAPIServer config party persistence tracer chain pparams callback action = handle onIOException $ do responseChannel <- newBroadcastTChanIO + -- Intialize our read models from stored events + -- NOTE: we do not keep the stored events around in memory timedOutputEvents <- loadAll - - -- Intialize our read model from stored events headStatusP <- mkProjection Idle (output <$> timedOutputEvents) projectHeadStatus snapshotUtxoP <- mkProjection Nothing (output <$> timedOutputEvents) projectSnapshotUtxo headIdP <- mkProjection Nothing (output <$> timedOutputEvents) projectInitializingHeadId - -- NOTE: we need to reverse the list because we store history in a reversed - -- list in memory but in order on disk - -- FIXME: always growing - history <- newTVarIO (reverse timedOutputEvents) + nextSeqVar <- newTVarIO 0 + let nextSeq = do + seq <- readTVar nextSeqVar + modifyTVar' nextSeqVar (+ 1) + pure seq + (notifyServerRunning, waitForServerRunning) <- setupServerNotification let serverSettings = @@ -106,7 +108,7 @@ withAPIServer config party persistence tracer chain pparams callback action = . simpleCors $ websocketsOr defaultConnectionOptions - (wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel) + (wsApp party tracer nextSeq callback headStatusP snapshotUtxoP responseChannel) (httpApp tracer chain pparams (atomically $ getLatest headIdP) (atomically $ getLatest snapshotUtxoP) callback) ) ( do @@ -114,7 +116,7 @@ withAPIServer config party persistence tracer chain pparams callback action = action $ Server { sendOutput = \output -> do - timedOutput <- appendToHistory history output + timedOutput <- persistOutput nextSeq output atomically $ do update headStatusP output update snapshotUtxoP output @@ -139,13 +141,11 @@ withAPIServer config party persistence tracer chain pparams callback action = _ -> runSettings settings app - appendToHistory history output = do + persistOutput nextSeq output = do time <- getCurrentTime timedOutput <- atomically $ do - seq <- nextSequenceNumber history - let timedOutput = TimedServerOutput{output, time, seq} - modifyTVar' history (timedOutput :) - pure timedOutput + seq <- nextSeq + pure TimedServerOutput{output, time, seq} append timedOutput pure timedOutput diff --git a/hydra-node/src/Hydra/API/WSServer.hs b/hydra-node/src/Hydra/API/WSServer.hs index 205a74faae7..841357c0bd4 100644 --- a/hydra-node/src/Hydra/API/WSServer.hs +++ b/hydra-node/src/Hydra/API/WSServer.hs @@ -49,7 +49,8 @@ wsApp :: IsChainState tx => Party -> Tracer IO APIServerLog -> - TVar [TimedServerOutput tx] -> + -- | Get next sequence number. + STM IO Natural -> (ClientInput tx -> IO ()) -> -- | Read model to enhance 'Greetings' messages with 'HeadStatus'. Projection STM.STM (ServerOutput tx) HeadStatus -> @@ -58,18 +59,19 @@ wsApp :: TChan (TimedServerOutput tx) -> PendingConnection -> IO () -wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel pending = do +wsApp party tracer nextSeq callback headStatusP snapshotUtxoP responseChannel pending = do traceWith tracer NewAPIConnection let path = requestPath $ pendingRequest pending queryParams <- uriQuery <$> mkURIBs path con <- acceptRequest pending chan <- STM.atomically $ dupTChan responseChannel + -- FIXME: No support of history forwarding anymore (disabled because of memory growing too much) -- api client can decide if they want to see the past history of server outputs - unless (shouldNotServeHistory queryParams) $ - forwardHistory con + -- unless (shouldNotServeHistory queryParams) $ + -- forwardHistory con - forwardGreetingOnly con + sendGreetings con let outConfig = mkServerOutputConfig queryParams @@ -79,8 +81,8 @@ wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel pe -- NOTE: We will add a 'Greetings' message on each API server start. This is -- important to make sure the latest configured 'party' is reaching the -- client. - forwardGreetingOnly con = do - seq <- atomically $ nextSequenceNumber history + sendGreetings con = do + seq <- atomically nextSeq headStatus <- atomically getLatestHeadStatus snapshotUtxo <- atomically getLatestSnapshotUtxo time <- getCurrentTime @@ -114,11 +116,11 @@ wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel pe queryP = QueryParam k v in if queryP `elem` qp then WithoutUTxO else WithUTxO - shouldNotServeHistory qp = - flip any qp $ \case - (QueryParam key val) - | key == [queryKey|history|] -> val == [queryValue|no|] - _other -> False + -- shouldNotServeHistory qp = + -- flip any qp $ \case + -- (QueryParam key val) + -- | key == [queryKey|history|] -> val == [queryValue|no|] + -- _other -> False sendOutputs chan con outConfig = forever $ do response <- STM.atomically $ readTChan chan @@ -139,18 +141,12 @@ wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel pe -- message to memory let clientInput = decodeUtf8With lenientDecode $ toStrict msg time <- getCurrentTime - seq <- atomically $ nextSequenceNumber history + seq <- atomically nextSeq let timedOutput = TimedServerOutput{output = InvalidInput @tx e clientInput, time, seq} sendTextData con $ Aeson.encode timedOutput traceWith tracer (APIInvalidInput e clientInput) - forwardHistory con = do - hist <- STM.atomically (readTVar history) - let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs - sendTextDatas con $ foldl' encodeAndReverse [] hist - -nextSequenceNumber :: TVar [TimedServerOutput tx] -> STM.STM Natural -nextSequenceNumber historyList = - STM.readTVar historyList >>= \case - [] -> pure 0 - (TimedServerOutput{seq} : _) -> pure (seq + 1) + -- forwardHistory con = do + -- hist <- STM.atomically (readTVar history) + -- let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs + -- sendTextDatas con $ foldl' encodeAndReverse [] hist