Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DONT MERGE: Not leak memory #1572

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions hydra-node/src/Hydra/API/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Hydra.Prelude hiding (TVar, readTVar, seq)
import Cardano.Ledger.Core (PParams)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO, readTVar)
import Control.Exception (IOException)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
Expand All @@ -21,7 +21,7 @@ import Hydra.API.ServerOutput (
projectInitializingHeadId,
projectSnapshotUtxo,
)
import Hydra.API.WSServer (nextSequenceNumber, wsApp)
import Hydra.API.WSServer (wsApp)
import Hydra.Cardano.Api (LedgerEra)
import Hydra.Chain (Chain (..), IsChainState)
import Hydra.Chain.Direct.State ()
Expand Down Expand Up @@ -79,16 +79,19 @@ withAPIServer ::
withAPIServer config party persistence tracer chain pparams callback action =
handle onIOException $ do
responseChannel <- newBroadcastTChanIO
-- Intialize our read models from stored events
-- NOTE: we do not keep the stored events around in memory
timedOutputEvents <- loadAll

-- Intialize our read model from stored events
headStatusP <- mkProjection Idle (output <$> timedOutputEvents) projectHeadStatus
snapshotUtxoP <- mkProjection Nothing (output <$> timedOutputEvents) projectSnapshotUtxo
headIdP <- mkProjection Nothing (output <$> timedOutputEvents) projectInitializingHeadId

-- NOTE: we need to reverse the list because we store history in a reversed
-- list in memory but in order on disk
history <- newTVarIO (reverse timedOutputEvents)
nextSeqVar <- newTVarIO 0
let nextSeq = do
seq <- readTVar nextSeqVar
modifyTVar' nextSeqVar (+ 1)
pure seq

(notifyServerRunning, waitForServerRunning) <- setupServerNotification

let serverSettings =
Expand All @@ -105,15 +108,15 @@ withAPIServer config party persistence tracer chain pparams callback action =
. simpleCors
$ websocketsOr
defaultConnectionOptions
(wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel)
(wsApp party tracer nextSeq callback headStatusP snapshotUtxoP responseChannel)
(httpApp tracer chain pparams (atomically $ getLatest headIdP) (atomically $ getLatest snapshotUtxoP) callback)
)
( do
waitForServerRunning
action $
Server
{ sendOutput = \output -> do
timedOutput <- appendToHistory history output
timedOutput <- persistOutput nextSeq output
atomically $ do
update headStatusP output
update snapshotUtxoP output
Expand All @@ -138,13 +141,11 @@ withAPIServer config party persistence tracer chain pparams callback action =
_ ->
runSettings settings app

appendToHistory history output = do
persistOutput nextSeq output = do
time <- getCurrentTime
timedOutput <- atomically $ do
seq <- nextSequenceNumber history
let timedOutput = TimedServerOutput{output, time, seq}
modifyTVar' history (timedOutput :)
pure timedOutput
seq <- nextSeq
pure TimedServerOutput{output, time, seq}
append timedOutput
pure timedOutput

Expand Down
42 changes: 19 additions & 23 deletions hydra-node/src/Hydra/API/WSServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ wsApp ::
IsChainState tx =>
Party ->
Tracer IO APIServerLog ->
TVar [TimedServerOutput tx] ->
-- | Get next sequence number.
STM IO Natural ->
(ClientInput tx -> IO ()) ->
-- | Read model to enhance 'Greetings' messages with 'HeadStatus'.
Projection STM.STM (ServerOutput tx) HeadStatus ->
Expand All @@ -58,18 +59,19 @@ wsApp ::
TChan (TimedServerOutput tx) ->
PendingConnection ->
IO ()
wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel pending = do
wsApp party tracer nextSeq callback headStatusP snapshotUtxoP responseChannel pending = do
traceWith tracer NewAPIConnection
let path = requestPath $ pendingRequest pending
queryParams <- uriQuery <$> mkURIBs path
con <- acceptRequest pending
chan <- STM.atomically $ dupTChan responseChannel

-- FIXME: No support of history forwarding anymore (disabled because of memory growing too much)
-- api client can decide if they want to see the past history of server outputs
unless (shouldNotServeHistory queryParams) $
forwardHistory con
-- unless (shouldNotServeHistory queryParams) $
-- forwardHistory con

forwardGreetingOnly con
sendGreetings con

let outConfig = mkServerOutputConfig queryParams

Expand All @@ -79,8 +81,8 @@ wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel pe
-- NOTE: We will add a 'Greetings' message on each API server start. This is
-- important to make sure the latest configured 'party' is reaching the
-- client.
forwardGreetingOnly con = do
seq <- atomically $ nextSequenceNumber history
sendGreetings con = do
seq <- atomically nextSeq
headStatus <- atomically getLatestHeadStatus
snapshotUtxo <- atomically getLatestSnapshotUtxo
time <- getCurrentTime
Expand Down Expand Up @@ -114,11 +116,11 @@ wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel pe
queryP = QueryParam k v
in if queryP `elem` qp then WithoutUTxO else WithUTxO

shouldNotServeHistory qp =
flip any qp $ \case
(QueryParam key val)
| key == [queryKey|history|] -> val == [queryValue|no|]
_other -> False
-- shouldNotServeHistory qp =
-- flip any qp $ \case
-- (QueryParam key val)
-- | key == [queryKey|history|] -> val == [queryValue|no|]
-- _other -> False

sendOutputs chan con outConfig = forever $ do
response <- STM.atomically $ readTChan chan
Expand All @@ -139,18 +141,12 @@ wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel pe
-- message to memory
let clientInput = decodeUtf8With lenientDecode $ toStrict msg
time <- getCurrentTime
seq <- atomically $ nextSequenceNumber history
seq <- atomically nextSeq
let timedOutput = TimedServerOutput{output = InvalidInput @tx e clientInput, time, seq}
sendTextData con $ Aeson.encode timedOutput
traceWith tracer (APIInvalidInput e clientInput)

forwardHistory con = do
hist <- STM.atomically (readTVar history)
let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs
sendTextDatas con $ foldl' encodeAndReverse [] hist

nextSequenceNumber :: TVar [TimedServerOutput tx] -> STM.STM Natural
nextSequenceNumber historyList =
STM.readTVar historyList >>= \case
[] -> pure 0
(TimedServerOutput{seq} : _) -> pure (seq + 1)
-- forwardHistory con = do
-- hist <- STM.atomically (readTVar history)
-- let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs
-- sendTextDatas con $ foldl' encodeAndReverse [] hist
20 changes: 11 additions & 9 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,13 @@ import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (
MonadSTM (readTQueue, writeTQueue),
modifyTVar',
newTQueueIO,
newTVarIO,
readTVarIO,
writeTVar,
)
import Control.Tracer (Tracer)
import Data.IntMap qualified as IMap
import Data.Sequence.Strict ((|>))
import Data.Sequence.Strict qualified as Seq
import Data.Vector (
Vector,
Expand Down Expand Up @@ -225,7 +223,7 @@ withReliability ::
-- | Underlying network component providing consuming and sending channels.
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a ->
NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do
withReliability tracer MessagePersistence{saveAcks, loadAcks, loadMessages} me otherParties withRawNetwork callback action = do
acksCache <- loadAcks >>= newTVarIO
sentMessages <- loadMessages >>= newTVarIO . Seq.fromList
resendQ <- newTQueueIO
Expand All @@ -236,15 +234,19 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa
reliableBroadcast sentMessages ourIndex acksCache network
where
allParties = fromList $ sort $ me : otherParties
reliableBroadcast sentMessages ourIndex acksCache Network{broadcast} =

reliableBroadcast _sentMessages ourIndex acksCache Network{broadcast} =
action $
Network
{ broadcast = \msg ->
case msg of
Data{} -> do
localCounter <- atomically $ cacheMessage msg >> incrementAckCounter
saveAcks localCounter
appendMessage msg
-- FIXME: No outbound message cache and persistence, resending will be broken
localCounter <- atomically $ do
-- cacheMessage msg
incrementAckCounter
-- saveAcks localCounter
-- appendMessage msg
traceWith tracer BroadcastCounter{ourIndex, localCounter}
broadcast $ ReliableMsg localCounter msg
Ping{} -> do
Expand All @@ -260,8 +262,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa
writeTVar acksCache newAcks
pure newAcks

cacheMessage msg =
modifyTVar' sentMessages (|> msg)
-- cacheMessage msg =
-- modifyTVar' sentMessages (|> msg)

reliableCallback acksCache sentMessages resend ourIndex (Authenticated (ReliableMsg acknowledged payload) party) = do
if length acknowledged /= length allParties
Expand Down
Loading