From 749d77943de6de9254ad3bea8a5a3e840661f4e0 Mon Sep 17 00:00:00 2001 From: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com> Date: Thu, 8 Aug 2024 16:00:38 -0400 Subject: [PATCH] fix merge conflict and metric emissions (#2065) --- .../streaming/full_node_streaming_manager.go | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index b2f4d24aba..473b611b9c 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -350,7 +350,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates))) + sm.AddUpdatesToCache(streamUpdates, clobPairIds) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -393,7 +393,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills))) + sm.AddUpdatesToCache(streamUpdates, clobPairIds) } // SendTakerOrderStatus sends out a taker order and its status to the full node streaming service. @@ -411,30 +411,31 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( } sm.AddUpdatesToCache( - map[uint32][]clobtypes.StreamUpdate{ - clobPairId: { - { - UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ - TakerOrder: &streamTakerOrder, - }, + []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ + TakerOrder: &streamTakerOrder, }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), }, }, - 1, + []uint32{clobPairId}, ) } +// AddUpdatesToCache adds a series of updates to the full node streaming cache. +// Clob pair ids are the clob pair id each update is relevant to. func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( updates []clobtypes.StreamUpdate, clobPairIds []uint32, - numUpdatesToAdd uint32, ) { sm.Lock() defer sm.Unlock() metrics.IncrCounter( metrics.GrpcAddUpdateToBufferCount, - float32(numUpdatesToAdd), + float32(len(updates)), ) sm.streamUpdateCache = append(sm.streamUpdateCache, updates...)