Skip to content

Commit

Permalink
metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed May 17, 2024
1 parent 8d67fbf commit 0ada61e
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 2 deletions.
2 changes: 2 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,7 @@ func (app *App) EndBlocker(ctx sdk.Context) (sdk.EndBlock, error) {
}
block := app.IndexerEventManager.ProduceBlock(ctx)
app.IndexerEventManager.SendOnchainData(block)
app.GrpcStreamingManager.EmitMetrics()
return response, err
}

Expand All @@ -1690,6 +1691,7 @@ func (app *App) PrepareCheckStater(ctx sdk.Context) {
if err := app.ModuleManager.PrepareCheckState(ctx); err != nil {
panic(err)
}
app.GrpcStreamingManager.EmitMetrics()
}

// InitChainer application update at chain initialization.
Expand Down
2 changes: 1 addition & 1 deletion protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (

DefaultGrpcStreamingEnabled = false
// TODO(jonfung) better value after stress testing
DefaultGrpcStreamingBufferSize = 100
DefaultGrpcStreamingBufferSize = 1000
DefaultVEOracleEnabled = true
)

Expand Down
2 changes: 2 additions & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const (
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcStreamingBufferSize = "grpc_streaming_buffer_size"
GrpcStreamingNumConnections = "grpc_streaming_num_connections"
EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
)
8 changes: 7 additions & 1 deletion protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package grpc

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -81,6 +82,11 @@ func (sm *GrpcStreamingManagerImpl) Stop() {
close(sm.updateBuffer)
}

func (sm *GrpcStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(metrics.GrpcStreamingBufferSize, float32(len(sm.updateBuffer)))
metrics.SetGauge(metrics.GrpcStreamingNumConnections, float32(len(sm.orderbookSubscriptions)))
}

func (sm *GrpcStreamingManagerImpl) sendUpdateResponse(
internalResponse bufferInternalResponse,
) {
Expand Down Expand Up @@ -129,7 +135,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(

sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription
sm.nextSubscriptionId++

sm.logger.Info(fmt.Sprintf("New GRPC Stream Connection established, %+v", clobPairIds))
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {

func (sm *NoopGrpcStreamingManager) Stop() {
}

func (sm *NoopGrpcStreamingManager) EmitMetrics() {
}
1 change: 1 addition & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type GrpcStreamingManager interface {
execMode sdk.ExecMode,
)
Stop()
EmitMetrics()
SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
Expand Down

0 comments on commit 0ada61e

Please sign in to comment.