From e1b4f78dc3d50be6264a89162ca6f95eb527c945 Mon Sep 17 00:00:00 2001 From: Jonathan Fung Date: Mon, 14 Oct 2024 13:46:57 -0400 Subject: [PATCH] few more metrics emissions, tag some metrics by subscription ids, remove unused code --- protocol/lib/metrics/metric_keys.go | 4 +- .../streaming/full_node_streaming_manager.go | 57 ++++++++----------- protocol/streaming/noop_streaming_manager.go | 7 --- protocol/streaming/types/interface.go | 5 -- protocol/x/clob/types/expected_keepers.go | 4 -- protocol/x/subaccounts/keeper/subaccount.go | 16 ------ protocol/x/subaccounts/types/types.go | 4 -- 7 files changed, 27 insertions(+), 70 deletions(-) diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index ac8e8a17d1e..ead248f516a 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -69,9 +69,8 @@ const ( FullNodeGrpc = "full_node_grpc" GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" - GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency" + GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" - GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency" GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" @@ -82,6 +81,7 @@ const ( GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count" GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count" GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count" + SubscriptionId = "subscription_id" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 85c265f12e7..5aa9e6fc207 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -145,7 +145,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool { } func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { - metrics.SetGauge( + metrics.AddSample( metrics.GrpcStreamNumUpdatesBuffered, float32(len(sm.streamUpdateCache)), ) @@ -154,9 +154,10 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { float32(len(sm.orderbookSubscriptions)), ) for _, subscription := range sm.orderbookSubscriptions { - metrics.AddSample( + metrics.AddSampleWithLabels( metrics.GrpcSubscriptionChannelLength, float32(len(subscription.updatesChannel)), + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), ) } } @@ -226,9 +227,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( // Use current goroutine to consistently poll subscription channel for updates // to send through stream. for updates := range subscription.updatesChannel { - metrics.IncrCounter( + metrics.IncrCounterWithLabels( metrics.GrpcSendResponseToSubscriberCount, 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), ) err = subscription.messageSender.Send( &clobtypes.StreamOrderbookUpdatesResponse{ @@ -364,9 +366,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates( return } + metrics.IncrCounterWithLabels( + metrics.GrpcAddToSubscriptionChannelCount, + 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)), + ) + select { case subscription.updatesChannel <- streamUpdates: default: + // Buffer is full. Emit metric and drop subscription. + sm.EmitMetrics() sm.logger.Error( fmt.Sprintf( "Streaming subscription id %+v channel full capacity. Dropping subscription connection.", @@ -399,6 +409,11 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( return } + metrics.IncrCounter( + metrics.GrpcSendSubaccountUpdateCount, + 1, + ) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ @@ -710,32 +725,6 @@ func getStreamUpdatesForSubaccountUpdates( return streamUpdates, subaccountIds } -// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and -// sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, -) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendFinalizedSubaccountUpdatesLatency, - time.Now(), - ) - - if execMode != sdk.ExecModeFinalize { - panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize") - } - - streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( - subaccountUpdates, - blockHeight, - execMode, - ) - - sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds) -} - // AddOrderUpdatesToCache adds a series of updates to the full node streaming cache. // Clob pair ids are the clob pair id each update is relevant to. func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( @@ -752,9 +741,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds) + sm.EmitMetrics() // Remove all subscriptions and wipe the buffer if buffer overflows. sm.RemoveSubscriptionsAndClearBufferIfFull() - sm.EmitMetrics() } // AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache. @@ -773,8 +762,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds) - sm.RemoveSubscriptionsAndClearBufferIfFull() sm.EmitMetrics() + sm.RemoveSubscriptionsAndClearBufferIfFull() } // RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows. @@ -790,6 +779,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull( } sm.streamUpdateCache = nil sm.streamUpdateSubscriptionCache = nil + sm.EmitMetrics() } } @@ -825,13 +815,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { // If the buffer is full, drop the subscription. for id, updates := range subscriptionUpdates { if subscription, ok := sm.orderbookSubscriptions[id]; ok { - metrics.IncrCounter( + metrics.IncrCounterWithLabels( metrics.GrpcAddToSubscriptionChannelCount, 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)), ) select { case subscription.updatesChannel <- updates: default: + // Buffer is full. Emit metric and drop subscription. + sm.EmitMetrics() idsToRemove = append(idsToRemove, id) } } diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 9dc7bf6de9f..89250854c4c 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -48,13 +48,6 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( ) { } -func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, -) { -} - func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool { return false } diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 5b42864016e..33907fc1ecc 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -42,11 +42,6 @@ type FullNodeStreamingManager interface { takerOrder clobtypes.StreamTakerOrder, ctx sdk.Context, ) - SendFinalizedSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, - ) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, diff --git a/protocol/x/clob/types/expected_keepers.go b/protocol/x/clob/types/expected_keepers.go index b14c4aeacdf..384d950604e 100644 --- a/protocol/x/clob/types/expected_keepers.go +++ b/protocol/x/clob/types/expected_keepers.go @@ -85,10 +85,6 @@ type SubaccountsKeeper interface { quantums *big.Int, perpetualId uint32, ) error - SendFinalizedSubaccountUpdates( - ctx sdk.Context, - subaccountUpdates []satypes.StreamSubaccountUpdate, - ) } type AssetsKeeper interface { diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index e72ccd60e7f..52d4c05bb33 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals( func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager { return k.streamingManager } - -// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager. -func (k Keeper) SendFinalizedSubaccountUpdates( - ctx sdk.Context, - subaccountUpdates []types.StreamSubaccountUpdate, -) { - lib.AssertDeliverTxMode(ctx) - if len(subaccountUpdates) == 0 { - return - } - k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates( - subaccountUpdates, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), - ) -} diff --git a/protocol/x/subaccounts/types/types.go b/protocol/x/subaccounts/types/types.go index cbccc9d2b91..3e180d05c3d 100644 --- a/protocol/x/subaccounts/types/types.go +++ b/protocol/x/subaccounts/types/types.go @@ -77,8 +77,4 @@ type SubaccountsKeeper interface { perpetualId uint32, blockHeight uint32, ) error - SendFinalizedSubaccountUpdates( - ctx sdk.Context, - subaccountUpdates []StreamSubaccountUpdate, - ) }