Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FNS changes on top of 5.2.x #2098

Merged
merged 31 commits into from
Aug 26, 2024
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b232282
[CT-964] Send deleveraging events in grpc stream (#1903)
dydxwill Jul 12, 2024
d113cb7
gRPC streaming clean up (#1906)
jayy04 Jul 13, 2024
41c4057
Post only order breaks out of matching loop + Add post only crosses m…
jonfung-dydx Aug 1, 2024
ec44fce
GPRC Streaming change default flag options, break upon connection ord…
jonfung-dydx Aug 2, 2024
987134d
Full Node Status taker order status protos (#2003)
jonfung-dydx Aug 2, 2024
f627b04
Fix proto formatting (#2020)
jonfung-dydx Aug 2, 2024
5bf5e3d
FNS protos - add taker order in stream update oneof (#2021)
jonfung-dydx Aug 5, 2024
566282b
add subaccount support for grpc stream (#1992)
dydxwill Aug 6, 2024
460454f
Fix orders issue for deleveraging events (#1958)
dydxwill Jul 24, 2024
73d39d3
Restructure FNS global cache to be list (#2036)
dydxwill Aug 7, 2024
a7d0270
Bump grpc stream flag default values (#2051)
dydxwill Aug 7, 2024
8b6c3d2
Bump grpc stream flag default values (#2051)
dydxwill Aug 7, 2024
a4bed83
[Full node streaming] emit taker order status at end of matching loop…
jonfung-dydx Aug 8, 2024
c23eb39
fix merge conflict and metric emissions (#2065)
jonfung-dydx Aug 8, 2024
3da874b
Full node streaming batch size reset to 2000, properly zero out cache…
jonfung-dydx Aug 9, 2024
3f76b54
FNS subaccount implementation (#2059)
dydxwill Aug 12, 2024
6b8e014
[CT-1050] DeliverTx state change reset for subaccount updates (#2063)
dydxwill Aug 12, 2024
5297fbd
Fix snapshot bool (#2078)
dydxwill Aug 12, 2024
0fa54f4
Full Node Streaming Recurring snapshots (#2079)
jonfung-dydx Aug 13, 2024
e06739d
Remove todo (#2087)
dydxwill Aug 14, 2024
66e7d98
Add websocket support to full node streaming (#1908)
jayy04 Aug 14, 2024
b960fe4
Full Node Streaming default port 9092 (#2092)
jonfung-dydx Aug 15, 2024
1ee13ed
[CT-1103] FNS subaccount WS support (#2088)
dydxwill Aug 15, 2024
06f1012
fix merge conflicts (#2115)
dydxwill Aug 23, 2024
2e62083
modify gh wf
dydxwill Aug 26, 2024
8a0261f
paginate liquidation daemon response (backport #2118) (#2119)
mergify[bot] Aug 21, 2024
06cba16
add telemetry and logs for liquidation daemon (backport #2122) (#2123)
mergify[bot] Aug 21, 2024
b839bb3
skip liquidation task loop if last committed block height is the same…
mergify[bot] Aug 21, 2024
08be416
revert gh wf
dydxwill Aug 26, 2024
086e07c
Full node streaming remove minimum for snapshot interval flag (#2138)
jonfung-dydx Aug 22, 2024
a2bf192
Support empty params for websocket endpoint (#2111)
dydxwill Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
paginate liquidation daemon response (backport #2118) (#2119)
Co-authored-by: jayy04 <[email protected]>
2 people authored and dydxwill committed Aug 26, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 8a0261f2344a05d3f109377a1b2825f2da380a41
29 changes: 22 additions & 7 deletions protocol/daemons/flags/flags.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package flags

import (
"time"

servertypes "github.com/cosmos/cosmos-sdk/server/types"
oracleconfig "github.com/skip-mev/slinky/oracle/config"
"github.com/spf13/cast"
"github.com/spf13/cobra"
"time"
)

// List of CLI flags for Server and Client.
@@ -22,9 +23,10 @@ const (
FlagBridgeDaemonLoopDelayMs = "bridge-daemon-loop-delay-ms"
FlagBridgeDaemonEthRpcEndpoint = "bridge-daemon-eth-rpc-endpoint"

FlagLiquidationDaemonEnabled = "liquidation-daemon-enabled"
FlagLiquidationDaemonLoopDelayMs = "liquidation-daemon-loop-delay-ms"
FlagLiquidationDaemonQueryPageLimit = "liquidation-daemon-query-page-limit"
FlagLiquidationDaemonEnabled = "liquidation-daemon-enabled"
FlagLiquidationDaemonLoopDelayMs = "liquidation-daemon-loop-delay-ms"
FlagLiquidationDaemonQueryPageLimit = "liquidation-daemon-query-page-limit"
FlagLiquidationDaemonResponsePageLimit = "liquidation-daemon-response-page-limit"

// Oracle flags
FlagOracleEnabled = "oracle.enabled"
@@ -62,6 +64,8 @@ type LiquidationFlags struct {
LoopDelayMs uint32
// QueryPageLimit configures the pagination limit for fetching subaccounts.
QueryPageLimit uint64
// ResponsePageLimit configures the pagination limit for the response to application.
ResponsePageLimit uint64
}

// PriceFlags contains configuration flags for the Price Daemon.
@@ -102,9 +106,10 @@ func GetDefaultDaemonFlags() DaemonFlags {
EthRpcEndpoint: "",
},
Liquidation: LiquidationFlags{
Enabled: true,
LoopDelayMs: 1_600,
QueryPageLimit: 1_000,
Enabled: true,
LoopDelayMs: 1_600,
QueryPageLimit: 1_000,
ResponsePageLimit: 2_000,
},
Price: PriceFlags{
Enabled: false,
@@ -183,6 +188,11 @@ func AddDaemonFlagsToCmd(
df.Liquidation.QueryPageLimit,
"Limit on the number of items to fetch per query in the Liquidation Daemon task loop.",
)
cmd.Flags().Uint64(
FlagLiquidationDaemonResponsePageLimit,
df.Liquidation.ResponsePageLimit,
"Limit on the number of items to send to the main application in the Liquidation Daemon task loop.",
)

// Price Daemon.
cmd.Flags().Bool(
@@ -276,6 +286,11 @@ func GetDaemonFlagValuesFromOptions(
result.Liquidation.QueryPageLimit = v
}
}
if option := appOpts.Get(FlagLiquidationDaemonResponsePageLimit); option != nil {
if v, err := cast.ToUint64E(option); err == nil {
result.Liquidation.ResponsePageLimit = v
}
}

// Price Daemon.
if option := appOpts.Get(FlagPriceDaemonEnabled); option != nil {
3 changes: 3 additions & 0 deletions protocol/daemons/flags/flags_test.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ func TestAddDaemonFlagsToCmd(t *testing.T) {
flags.FlagLiquidationDaemonEnabled,
flags.FlagLiquidationDaemonLoopDelayMs,
flags.FlagLiquidationDaemonQueryPageLimit,
flags.FlagLiquidationDaemonResponsePageLimit,

flags.FlagPriceDaemonEnabled,
flags.FlagPriceDaemonLoopDelayMs,
@@ -53,6 +54,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
optsMap[flags.FlagLiquidationDaemonEnabled] = true
optsMap[flags.FlagLiquidationDaemonLoopDelayMs] = uint32(2222)
optsMap[flags.FlagLiquidationDaemonQueryPageLimit] = uint64(3333)
optsMap[flags.FlagLiquidationDaemonResponsePageLimit] = uint64(4444)

optsMap[flags.FlagPriceDaemonEnabled] = true
optsMap[flags.FlagPriceDaemonLoopDelayMs] = uint32(4444)
@@ -83,6 +85,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
require.Equal(t, optsMap[flags.FlagLiquidationDaemonEnabled], r.Liquidation.Enabled)
require.Equal(t, optsMap[flags.FlagLiquidationDaemonLoopDelayMs], r.Liquidation.LoopDelayMs)
require.Equal(t, optsMap[flags.FlagLiquidationDaemonQueryPageLimit], r.Liquidation.QueryPageLimit)
require.Equal(t, optsMap[flags.FlagLiquidationDaemonResponsePageLimit], r.Liquidation.ResponsePageLimit)

// Price Daemon.
require.Equal(t, optsMap[flags.FlagPriceDaemonEnabled], r.Price.Enabled)
143 changes: 135 additions & 8 deletions protocol/daemons/liquidation/client/grpc_helper.go
Original file line number Diff line number Diff line change
@@ -211,6 +211,7 @@ func (c *Client) SendLiquidatableSubaccountIds(
liquidatableSubaccountIds []satypes.SubaccountId,
negativeTncSubaccountIds []satypes.SubaccountId,
openPositionInfoMap map[uint32]*clobtypes.SubaccountOpenPositionInfo,
pageLimit uint64,
) error {
defer telemetry.ModuleMeasureSince(
metrics.LiquidationDaemon,
@@ -241,19 +242,145 @@ func (c *Client) SendLiquidatableSubaccountIds(
subaccountOpenPositionInfo = append(subaccountOpenPositionInfo, *openPositionInfoMap[perpetualId])
}

request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
LiquidatableSubaccountIds: liquidatableSubaccountIds,
NegativeTncSubaccountIds: negativeTncSubaccountIds,
SubaccountOpenPositionInfo: subaccountOpenPositionInfo,
}
// Break this down to multiple requests if the number of subaccounts is too large.

// Liquidatable subaccount ids.
requests := GenerateLiquidateSubaccountsPaginatedRequests(
liquidatableSubaccountIds,
blockHeight,
pageLimit,
)

if _, err := c.LiquidationServiceClient.LiquidateSubaccounts(ctx, request); err != nil {
return err
// Negative TNC subaccount ids.
requests = append(
requests,
GenerateNegativeTNCSubaccountsPaginatedRequests(
negativeTncSubaccountIds,
blockHeight,
pageLimit,
)...,
)

// Subaccount open position info.
requests = append(
requests,
GenerateSubaccountOpenPositionPaginatedRequests(
subaccountOpenPositionInfo,
blockHeight,
pageLimit,
)...,
)

for _, req := range requests {
if _, err := c.LiquidationServiceClient.LiquidateSubaccounts(ctx, req); err != nil {
return err
}
}

return nil
}

func GenerateLiquidateSubaccountsPaginatedRequests(
ids []satypes.SubaccountId,
blockHeight uint32,
pageLimit uint64,
) []*api.LiquidateSubaccountsRequest {
if len(ids) == 0 {
return []*api.LiquidateSubaccountsRequest{
{
BlockHeight: blockHeight,
LiquidatableSubaccountIds: []satypes.SubaccountId{},
},
}
}

requests := make([]*api.LiquidateSubaccountsRequest, 0)
for start := 0; start < len(ids); start += int(pageLimit) {
end := lib.Min(start+int(pageLimit), len(ids))
request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
LiquidatableSubaccountIds: ids[start:end],
}
requests = append(requests, request)
}
return requests
}

func GenerateNegativeTNCSubaccountsPaginatedRequests(
ids []satypes.SubaccountId,
blockHeight uint32,
pageLimit uint64,
) []*api.LiquidateSubaccountsRequest {
if len(ids) == 0 {
return []*api.LiquidateSubaccountsRequest{
{
BlockHeight: blockHeight,
NegativeTncSubaccountIds: []satypes.SubaccountId{},
},
}
}

requests := make([]*api.LiquidateSubaccountsRequest, 0)
for start := 0; start < len(ids); start += int(pageLimit) {
end := lib.Min(start+int(pageLimit), len(ids))
request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
NegativeTncSubaccountIds: ids[start:end],
}
requests = append(requests, request)
}
return requests
}

func GenerateSubaccountOpenPositionPaginatedRequests(
subaccountOpenPositionInfo []clobtypes.SubaccountOpenPositionInfo,
blockHeight uint32,
pageLimit uint64,
) []*api.LiquidateSubaccountsRequest {
if len(subaccountOpenPositionInfo) == 0 {
return []*api.LiquidateSubaccountsRequest{
{
BlockHeight: blockHeight,
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
},
}
}

requests := make([]*api.LiquidateSubaccountsRequest, 0)
for _, info := range subaccountOpenPositionInfo {
// Long positions.
for start := 0; start < len(info.SubaccountsWithLongPosition); start += int(pageLimit) {
end := lib.Min(start+int(pageLimit), len(info.SubaccountsWithLongPosition))
request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
{
PerpetualId: info.PerpetualId,
SubaccountsWithLongPosition: info.SubaccountsWithLongPosition[start:end],
},
},
}
requests = append(requests, request)
}

// Short positions.
for start := 0; start < len(info.SubaccountsWithShortPosition); start += int(pageLimit) {
end := lib.Min(start+int(pageLimit), len(info.SubaccountsWithShortPosition))
request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
{
PerpetualId: info.PerpetualId,
SubaccountsWithShortPosition: info.SubaccountsWithShortPosition[start:end],
},
},
}
requests = append(requests, request)
}
}
return requests
}

func newContextWithQueryBlockHeight(
ctx context.Context,
blockHeight uint32,
52 changes: 43 additions & 9 deletions protocol/daemons/liquidation/client/grpc_helper_test.go
Original file line number Diff line number Diff line change
@@ -469,22 +469,45 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
req := &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{constants.Alice_Num0, constants.Bob_Num0},
NegativeTncSubaccountIds: []satypes.SubaccountId{constants.Carl_Num0, constants.Dave_Num0},
}
response := &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
NegativeTncSubaccountIds: []satypes.SubaccountId{constants.Carl_Num0, constants.Dave_Num0},
}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
{
PerpetualId: 0,
SubaccountsWithLongPosition: []satypes.SubaccountId{
constants.Alice_Num0,
constants.Carl_Num0,
},
},
},
}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
{
PerpetualId: 0,
SubaccountsWithShortPosition: []satypes.SubaccountId{
constants.Bob_Num0,
constants.Dave_Num0,
},
},
},
}
response := &api.LiquidateSubaccountsResponse{}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
},
liquidatableSubaccountIds: []satypes.SubaccountId{
@@ -512,12 +535,24 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
"Success Empty": {
setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
req := &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{},
}
response := &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
NegativeTncSubaccountIds: []satypes.SubaccountId{},
}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{},
NegativeTncSubaccountIds: []satypes.SubaccountId{},
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
}
response := &api.LiquidateSubaccountsResponse{}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
},
liquidatableSubaccountIds: []satypes.SubaccountId{},
@@ -527,10 +562,8 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
"Errors are propagated": {
setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
req := &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{},
NegativeTncSubaccountIds: []satypes.SubaccountId{},
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{},
}
mck.On("LiquidateSubaccounts", ctx, req).Return(nil, errors.New("test error"))
},
@@ -555,6 +588,7 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
tc.liquidatableSubaccountIds,
tc.negativeTncSubaccountIds,
tc.subaccountOpenPositionInfo,
1000,
)
require.Equal(t, tc.expectedError, err)
})
1 change: 1 addition & 0 deletions protocol/daemons/liquidation/client/sub_task_runner.go
Original file line number Diff line number Diff line change
@@ -87,6 +87,7 @@ func (s *SubTaskRunnerImpl) RunLiquidationDaemonTaskLoop(
liquidatableSubaccountIds,
negativeTncSubaccountIds,
subaccountOpenPositionInfo,
liqFlags.ResponsePageLimit,
)
if err != nil {
return err
Loading