Skip to content

Commit

Permalink
Add a channel buffer to decouple abci and grpc streaming (backport #1530
Browse files Browse the repository at this point in the history
) (#1595)

Co-authored-by: Jonathan Fung <[email protected]>
Co-authored-by: Jonathan Fung <[email protected]>
  • Loading branch information
3 people authored May 28, 2024
1 parent efa59b4 commit f15e98f
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 113 deletions.
11 changes: 9 additions & 2 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
Expand Down Expand Up @@ -425,6 +426,9 @@ func New(
if app.Server != nil {
app.Server.Stop()
}
if app.GrpcStreamingManager != nil {
app.GrpcStreamingManager.Stop()
}
return nil
},
)
Expand Down Expand Up @@ -1510,6 +1514,7 @@ func (app *App) EndBlocker(ctx sdk.Context) (sdk.EndBlock, error) {
}
block := app.IndexerEventManager.ProduceBlock(ctx)
app.IndexerEventManager.SendOnchainData(block)
app.GrpcStreamingManager.EmitMetrics()
return response, err
}

Expand All @@ -1527,6 +1532,7 @@ func (app *App) PrepareCheckStater(ctx sdk.Context) {
if err := app.ModuleManager.PrepareCheckState(ctx); err != nil {
panic(err)
}
app.GrpcStreamingManager.EmitMetrics()
}

// InitChainer application update at chain initialization.
Expand Down Expand Up @@ -1765,8 +1771,9 @@ func getGrpcStreamingManagerFromOptions(
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
return streaming.NewGrpcStreamingManager()
grpcStreamingBufferSize := uint32(appFlags.GrpcStreamingBufferSize)
logger.Info(fmt.Sprintf("GRPC streaming is enabled with buffer size %d", grpcStreamingBufferSize))
return streaming.NewGrpcStreamingManager(logger, grpcStreamingBufferSize)
}
return streaming.NewNoopGrpcStreamingManager()
}
23 changes: 19 additions & 4 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type Flags struct {
GrpcEnable bool

// Grpc Streaming
GrpcStreamingEnabled bool
GrpcStreamingEnabled bool
GrpcStreamingBufferSize uint16
}

// List of CLI flags.
Expand All @@ -36,7 +37,8 @@ const (
GrpcEnable = "grpc.enable"

// Grpc Streaming
GrpcStreamingEnabled = "grpc-streaming-enabled"
GrpcStreamingEnabled = "grpc-streaming-enabled"
GrpcStreamingBufferSize = "grpc-streaming-buffer-size"
)

// Default values.
Expand All @@ -46,7 +48,8 @@ const (
DefaultNonValidatingFullNode = false
DefaultDdErrorTrackingFormat = false

DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingBufferSize = 1000
)

// AddFlagsToCmd adds flags to app initialization.
Expand Down Expand Up @@ -80,6 +83,11 @@ func AddFlagsToCmd(cmd *cobra.Command) {
DefaultGrpcStreamingEnabled,
"Whether to enable grpc streaming for full nodes",
)
cmd.Flags().Uint16(
GrpcStreamingBufferSize,
DefaultGrpcStreamingBufferSize,
"Protocol-side buffer channel size to store grpc stream updates before dropping messages",
)
}

// Validate checks that the flags are valid.
Expand Down Expand Up @@ -114,7 +122,8 @@ func GetFlagValuesFromOptions(
GrpcAddress: config.DefaultGRPCAddress,
GrpcEnable: true,

GrpcStreamingEnabled: DefaultGrpcStreamingEnabled,
GrpcStreamingEnabled: DefaultGrpcStreamingEnabled,
GrpcStreamingBufferSize: DefaultGrpcStreamingBufferSize,
}

// Populate the flags if they exist.
Expand Down Expand Up @@ -160,5 +169,11 @@ func GetFlagValuesFromOptions(
}
}

if option := appOpts.Get(GrpcStreamingBufferSize); option != nil {
if v, err := cast.ToUint16E(option); err == nil {
result.GrpcStreamingBufferSize = v
}
}

return result
}
29 changes: 26 additions & 3 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func TestAddFlagsToCommand(t *testing.T) {
fmt.Sprintf("Has %s flag", flags.GrpcStreamingEnabled): {
flagName: flags.GrpcStreamingEnabled,
},
fmt.Sprintf("Has %s flag", flags.GrpcStreamingBufferSize): {
flagName: flags.GrpcStreamingBufferSize,
},
}

for name, tc := range tests {
Expand Down Expand Up @@ -63,9 +66,10 @@ func TestValidate(t *testing.T) {
},
"success - gRPC streaming enabled for validating nodes": {
flags: flags.Flags{
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingBufferSize: 15,
},
},
"failure - gRPC disabled": {
Expand Down Expand Up @@ -107,6 +111,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcAddress string
expectedGrpcEnable bool
expectedGrpcStreamingEnable bool
expectedGrpcStreamingBufferSize uint16
}{
"Sets to default if unset": {
expectedNonValidatingFullNodeFlag: false,
Expand All @@ -115,6 +120,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcAddress: "localhost:9090",
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
expectedGrpcStreamingBufferSize: 1000,
},
"Sets values from options": {
optsMap: map[string]any{
Expand All @@ -124,13 +130,15 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
flags.GrpcEnable: false,
flags.GrpcAddress: "localhost:9091",
flags.GrpcStreamingEnabled: "true",
flags.GrpcStreamingBufferSize: "15",
},
expectedNonValidatingFullNodeFlag: true,
expectedDdAgentHost: "agentHostTest",
expectedDdTraceAgentPort: 777,
expectedGrpcEnable: false,
expectedGrpcAddress: "localhost:9091",
expectedGrpcStreamingEnable: true,
expectedGrpcStreamingBufferSize: 15,
},
}

Expand Down Expand Up @@ -168,6 +176,21 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
tc.expectedGrpcAddress,
flags.GrpcAddress,
)
require.Equal(
t,
tc.expectedGrpcAddress,
flags.GrpcAddress,
)
require.Equal(
t,
tc.expectedGrpcStreamingEnable,
flags.GrpcStreamingEnabled,
)
require.Equal(
t,
tc.expectedGrpcStreamingBufferSize,
flags.GrpcStreamingBufferSize,
)
})
}
}
2 changes: 2 additions & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcStreamingBufferSize = "grpc_streaming_buffer_size"
GrpcStreamingNumConnections = "grpc_streaming_num_connections"
EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
)
Loading

0 comments on commit f15e98f

Please sign in to comment.