Skip to content

Commit

Permalink
Enable using multiple mev telemetry hosts (#838)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucas-dydx authored Dec 6, 2023
1 parent 77773c3 commit 9b3d1a2
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 37 deletions.
26 changes: 14 additions & 12 deletions protocol/x/clob/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flags

import (
"fmt"
"strings"

servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/spf13/cast"
Expand All @@ -15,7 +16,7 @@ type ClobFlags struct {
MaxDeleveragingSubaccountsToIterate uint32

MevTelemetryEnabled bool
MevTelemetryHost string
MevTelemetryHosts []string
MevTelemetryIdentifier string
}

Expand All @@ -28,21 +29,24 @@ const (

// Mev.
MevTelemetryEnabled = "mev-telemetry-enabled"
MevTelemetryHost = "mev-telemetry-host"
MevTelemetryHosts = "mev-telemetry-hosts"
MevTelemetryIdentifier = "mev-telemetry-identifier"
)

// Default values.

const (
DefaultMaxLiquidationAttemptsPerBlock = 50
DefaultMaxDeleveragingAttemptsPerBlock = 10
DefaultMaxDeleveragingSubaccountsToIterate = 500

DefaultMevTelemetryEnabled = false
DefaultMevTelemetryHost = ""
DefaultMevTelemetryHostsFlag = ""
DefaultMevTelemetryIdentifier = ""
)

var DefaultMevTelemetryHosts = []string{}

// AddFlagsToCmd adds flags to app initialization.
// These flags should be applied to the `start` command of the V4 Cosmos application.
// E.g. `dydxprotocold start --non-validating-full-node true`.
Expand Down Expand Up @@ -77,14 +81,14 @@ func AddClobFlagsToCmd(cmd *cobra.Command) {
"Runs the MEV Telemetry collection agent if true.",
)
cmd.Flags().String(
MevTelemetryHost,
DefaultMevTelemetryHost,
"Sets the address to connect to for the MEV Telemetry collection agent.",
MevTelemetryHosts,
DefaultMevTelemetryHostsFlag,
"Sets the addresses (comma-delimited) to connect to the MEV Telemetry collection agents.",
)
cmd.Flags().String(
MevTelemetryIdentifier,
DefaultMevTelemetryIdentifier,
"Sets the identifier to use for MEV Telemetry collection agent.",
"Sets the identifier to use for MEV Telemetry collection agents.",
)
}

Expand All @@ -94,7 +98,7 @@ func GetDefaultClobFlags() ClobFlags {
MaxDeleveragingAttemptsPerBlock: DefaultMaxDeleveragingAttemptsPerBlock,
MaxDeleveragingSubaccountsToIterate: DefaultMaxDeleveragingSubaccountsToIterate,
MevTelemetryEnabled: DefaultMevTelemetryEnabled,
MevTelemetryHost: DefaultMevTelemetryHost,
MevTelemetryHosts: DefaultMevTelemetryHosts,
MevTelemetryIdentifier: DefaultMevTelemetryIdentifier,
}
}
Expand All @@ -114,10 +118,8 @@ func GetClobFlagValuesFromOptions(
}
}

if option := appOpts.Get(MevTelemetryHost); option != nil {
if v, err := cast.ToStringE(option); err == nil {
result.MevTelemetryHost = v
}
if v, ok := appOpts.Get(MevTelemetryHosts).(string); ok {
result.MevTelemetryHosts = strings.Split(v, ",")
}

if option := appOpts.Get(MevTelemetryIdentifier); option != nil {
Expand Down
35 changes: 25 additions & 10 deletions protocol/x/clob/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ func TestAddFlagsToCommand(t *testing.T) {
fmt.Sprintf("Has %s flag", flags.MaxDeleveragingAttemptsPerBlock): {
flagName: flags.MaxDeleveragingAttemptsPerBlock,
},
fmt.Sprintf("Has %s flag", flags.MevTelemetryHost): {
flagName: flags.MevTelemetryHost,
fmt.Sprintf("Has %s flag", flags.MevTelemetryHosts): {
flagName: flags.MevTelemetryHosts,
},
fmt.Sprintf("Has %s flag", flags.MevTelemetryIdentifier): {
flagName: flags.MevTelemetryIdentifier,
}}
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand All @@ -47,28 +48,42 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedMaxLiquidationAttemptsPerBlock uint32
expectedMaxDeleveragingAttemptsPerBlock uint32
expectedMaxDeleveragingSubaccountsToIterate uint32
expectedMevTelemetryHost string
expectedMevTelemetryHosts []string
expectedMevTelemetryIdentifier string
}{
"Sets to default if unset": {
expectedMaxLiquidationAttemptsPerBlock: flags.DefaultMaxLiquidationAttemptsPerBlock,
expectedMaxDeleveragingAttemptsPerBlock: flags.DefaultMaxDeleveragingAttemptsPerBlock,
expectedMaxDeleveragingSubaccountsToIterate: flags.DefaultMaxDeleveragingSubaccountsToIterate,
expectedMevTelemetryHost: flags.DefaultMevTelemetryHost,
expectedMevTelemetryHosts: flags.DefaultMevTelemetryHosts,
expectedMevTelemetryIdentifier: flags.DefaultMevTelemetryIdentifier,
},
"Sets values from options": {
"Sets values from options with one host": {
optsMap: map[string]any{
flags.MaxLiquidationAttemptsPerBlock: uint32(50),
flags.MaxDeleveragingAttemptsPerBlock: uint32(25),
flags.MaxDeleveragingSubaccountsToIterate: uint32(100),
flags.MevTelemetryHosts: "https://localhost:13137",
flags.MevTelemetryIdentifier: "node-agent-01",
},
expectedMaxLiquidationAttemptsPerBlock: uint32(50),
expectedMaxDeleveragingAttemptsPerBlock: uint32(25),
expectedMaxDeleveragingSubaccountsToIterate: uint32(100),
expectedMevTelemetryHosts: []string{"https://localhost:13137"},
expectedMevTelemetryIdentifier: "node-agent-01",
},
"Sets values from options with multiple hosts": {
optsMap: map[string]any{
flags.MaxLiquidationAttemptsPerBlock: uint32(50),
flags.MaxDeleveragingAttemptsPerBlock: uint32(25),
flags.MaxDeleveragingSubaccountsToIterate: uint32(100),
flags.MevTelemetryHost: "https://localhost:13137",
flags.MevTelemetryHosts: "https://localhost:13137,https://example.dev:443",
flags.MevTelemetryIdentifier: "node-agent-01",
},
expectedMaxLiquidationAttemptsPerBlock: uint32(50),
expectedMaxDeleveragingAttemptsPerBlock: uint32(25),
expectedMaxDeleveragingSubaccountsToIterate: uint32(100),
expectedMevTelemetryHost: "https://localhost:13137",
expectedMevTelemetryHosts: []string{"https://localhost:13137", "https://example.dev:443"},
expectedMevTelemetryIdentifier: "node-agent-01",
},
}
Expand All @@ -84,8 +99,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
flags := flags.GetClobFlagValuesFromOptions(&mockOpts)
require.Equal(
t,
tc.expectedMevTelemetryHost,
flags.MevTelemetryHost,
tc.expectedMevTelemetryHosts,
flags.MevTelemetryHosts,
)
require.Equal(
t,
Expand Down
8 changes: 5 additions & 3 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ type (
}
)

var _ types.ClobKeeper = &Keeper{}
var _ types.MemClobKeeper = &Keeper{}
var (
_ types.ClobKeeper = &Keeper{}
_ types.MemClobKeeper = &Keeper{}
)

func NewKeeper(
cdc codec.BinaryCodec,
Expand Down Expand Up @@ -105,7 +107,7 @@ func NewKeeper(
txDecoder: txDecoder,
mevTelemetryConfig: MevTelemetryConfig{
Enabled: clobFlags.MevTelemetryEnabled,
Host: clobFlags.MevTelemetryHost,
Hosts: clobFlags.MevTelemetryHosts,
Identifier: clobFlags.MevTelemetryIdentifier,
},
Flags: clobFlags,
Expand Down
6 changes: 3 additions & 3 deletions protocol/x/clob/keeper/mev.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

type MevTelemetryConfig struct {
Enabled bool
Host string
Hosts []string
Identifier string
}

Expand Down Expand Up @@ -320,7 +320,7 @@ func (k Keeper) RecordMevMetrics(
mevPerMarket[clobPairId] = mev
}

if k.mevTelemetryConfig.Host != "" {
if len(k.mevTelemetryConfig.Hosts) != 0 {
mevClobMidPrices := make([]types.ClobMidPrice, 0, len(clobPairs))
for _, clobPair := range clobPairs {
mevClobMidPrices = append(
Expand All @@ -333,7 +333,7 @@ func (k Keeper) RecordMevMetrics(
}
go mev_telemetry.SendDatapoints(
ctx,
k.mevTelemetryConfig.Host,
k.mevTelemetryConfig.Hosts,
types.MevMetrics{
MevDatapoint: types.MEVDatapoint{
Height: lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
Expand Down
22 changes: 13 additions & 9 deletions protocol/x/clob/mev_telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import (
)

var client = &http.Client{
Timeout: 2 * time.Second,
Timeout: 30 * time.Second,
}

func logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With(sdklog.ModuleKey, "x/clob/mev_telemetry")
}

// SendDatapoints sends MEV metrics to an HTTP-based metric collection service
func SendDatapoints(ctx sdk.Context, address string, mevMetrics types.MevMetrics) {
func SendDatapoints(ctx sdk.Context, addresses []string, mevMetrics types.MevMetrics) {
defer telemetry.ModuleMeasureSince(types.ModuleName, time.Now(), metrics.MevSentDatapoints, metrics.Latency)

defer func() {
Expand All @@ -40,40 +40,44 @@ func SendDatapoints(ctx sdk.Context, address string, mevMetrics types.MevMetrics
}
}()

data, err := json.Marshal(mevMetrics)
for _, address := range addresses {
sendDatapointsToTelemetryService(ctx, address, mevMetrics)
}
}

func sendDatapointsToTelemetryService(ctx sdk.Context, address string, mevMetrics types.MevMetrics) {
data, err := json.Marshal(mevMetrics)
if err != nil {
logger(ctx).Error("error marshalling mev metrics", "error", err)
telemetry.IncrCounter(1, types.ModuleName, metrics.MevSentDatapoints, metrics.Error, metrics.Count)
return
}

resp, err := client.Post(address, "application/json", bytes.NewBuffer(data))

if err != nil {
logger(ctx).Error("error sending mev metric", "error", err)
logger(ctx).Error("error sending mev metric", "error", address, "error", err)
telemetry.IncrCounter(1, types.ModuleName, metrics.MevSentDatapoints, metrics.Error, metrics.Count)
return
}

defer resp.Body.Close()

responseBody, err := io.ReadAll(resp.Body)

if err != nil {
logger(ctx).Error("error reading response", "error", err)
logger(ctx).Error("error reading response", "address", address, "error", err)
telemetry.IncrCounter(1, types.ModuleName, metrics.MevSentDatapoints, metrics.Error, metrics.Count)
return
}

if len(responseBody) == 0 {
logger(ctx).Error("0-byte response from mev telemetry server")
logger(ctx).Error("0-byte response from mev telemetry server", "address", address)
telemetry.IncrCounter(1, types.ModuleName, metrics.MevSentDatapoints, metrics.Error, metrics.Count)
return
}

if resp.StatusCode != http.StatusOK {
logger(ctx).Error("error sending mev datapoint", "error", "non-200 http status-code", "status_code", resp.StatusCode)
logger(ctx).Error("error sending mev datapoint", "address", address,
"error", "non-200 http status-code", "status_code", resp.StatusCode)
telemetry.IncrCounter(1, types.ModuleName, metrics.MevSentDatapoints, metrics.Error, metrics.Count)
return
}
Expand Down

0 comments on commit 9b3d1a2

Please sign in to comment.