diff --git a/protocol/app/app.go b/protocol/app/app.go index 749ba6fa270..400ffb5aae3 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1673,6 +1673,7 @@ func (app *App) EndBlocker(ctx sdk.Context) (sdk.EndBlock, error) { } block := app.IndexerEventManager.ProduceBlock(ctx) app.IndexerEventManager.SendOnchainData(block) + app.GrpcStreamingManager.EmitMetrics() return response, err } @@ -1690,6 +1691,7 @@ func (app *App) PrepareCheckStater(ctx sdk.Context) { if err := app.ModuleManager.PrepareCheckState(ctx); err != nil { panic(err) } + app.GrpcStreamingManager.EmitMetrics() } // InitChainer application update at chain initialization. diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index 9f8d1a8f742..dddd8d902a0 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -54,7 +54,7 @@ const ( DefaultGrpcStreamingEnabled = false // TODO(jonfung) better value after stress testing - DefaultGrpcStreamingBufferSize = 100 + DefaultGrpcStreamingBufferSize = 1000 DefaultVEOracleEnabled = true ) diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index bba17887a87..cee7f70acca 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -68,6 +68,8 @@ const ( FullNodeGrpc = "full_node_grpc" GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" + GrpcStreamingBufferSize = "grpc_streaming_buffer_size" + GrpcStreamingNumConnections = "grpc_streaming_num_connections" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" ) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 05f1dd491b6..7aacc143975 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -1,6 +1,7 @@ package grpc import ( + "fmt" "sync" "time" @@ -81,6 +82,11 @@ func (sm *GrpcStreamingManagerImpl) Stop() { close(sm.updateBuffer) } +func (sm *GrpcStreamingManagerImpl) EmitMetrics() { + metrics.SetGauge(metrics.GrpcStreamingBufferSize, float32(len(sm.updateBuffer))) + metrics.SetGauge(metrics.GrpcStreamingNumConnections, float32(len(sm.orderbookSubscriptions))) +} + func (sm *GrpcStreamingManagerImpl) sendUpdateResponse( internalResponse bufferInternalResponse, ) { @@ -129,7 +135,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription sm.nextSubscriptionId++ - + sm.logger.Info(fmt.Sprintf("New GRPC Stream Connection established, %+v", clobPairIds)) return nil } diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index fdce4bfb522..3a79eab23eb 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -48,3 +48,6 @@ func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { func (sm *NoopGrpcStreamingManager) Stop() { } + +func (sm *NoopGrpcStreamingManager) EmitMetrics() { +} diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 198493f5079..35a1ec7307b 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -23,6 +23,7 @@ type GrpcStreamingManager interface { execMode sdk.ExecMode, ) Stop() + EmitMetrics() SendOrderbookFillUpdates( orderbookFills []clobtypes.StreamOrderbookFill, blockHeight uint32,