Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FNS changes on top of 5.2.x #2098

Merged
merged 31 commits into from
Aug 26, 2024
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b232282
[CT-964] Send deleveraging events in grpc stream (#1903)
dydxwill Jul 12, 2024
d113cb7
gRPC streaming clean up (#1906)
jayy04 Jul 13, 2024
41c4057
Post only order breaks out of matching loop + Add post only crosses m…
jonfung-dydx Aug 1, 2024
ec44fce
GPRC Streaming change default flag options, break upon connection ord…
jonfung-dydx Aug 2, 2024
987134d
Full Node Status taker order status protos (#2003)
jonfung-dydx Aug 2, 2024
f627b04
Fix proto formatting (#2020)
jonfung-dydx Aug 2, 2024
5bf5e3d
FNS protos - add taker order in stream update oneof (#2021)
jonfung-dydx Aug 5, 2024
566282b
add subaccount support for grpc stream (#1992)
dydxwill Aug 6, 2024
460454f
Fix orders issue for deleveraging events (#1958)
dydxwill Jul 24, 2024
73d39d3
Restructure FNS global cache to be list (#2036)
dydxwill Aug 7, 2024
a7d0270
Bump grpc stream flag default values (#2051)
dydxwill Aug 7, 2024
8b6c3d2
Bump grpc stream flag default values (#2051)
dydxwill Aug 7, 2024
a4bed83
[Full node streaming] emit taker order status at end of matching loop…
jonfung-dydx Aug 8, 2024
c23eb39
fix merge conflict and metric emissions (#2065)
jonfung-dydx Aug 8, 2024
3da874b
Full node streaming batch size reset to 2000, properly zero out cache…
jonfung-dydx Aug 9, 2024
3f76b54
FNS subaccount implementation (#2059)
dydxwill Aug 12, 2024
6b8e014
[CT-1050] DeliverTx state change reset for subaccount updates (#2063)
dydxwill Aug 12, 2024
5297fbd
Fix snapshot bool (#2078)
dydxwill Aug 12, 2024
0fa54f4
Full Node Streaming Recurring snapshots (#2079)
jonfung-dydx Aug 13, 2024
e06739d
Remove todo (#2087)
dydxwill Aug 14, 2024
66e7d98
Add websocket support to full node streaming (#1908)
jayy04 Aug 14, 2024
b960fe4
Full Node Streaming default port 9092 (#2092)
jonfung-dydx Aug 15, 2024
1ee13ed
[CT-1103] FNS subaccount WS support (#2088)
dydxwill Aug 15, 2024
06f1012
fix merge conflicts (#2115)
dydxwill Aug 23, 2024
2e62083
modify gh wf
dydxwill Aug 26, 2024
8a0261f
paginate liquidation daemon response (backport #2118) (#2119)
mergify[bot] Aug 21, 2024
06cba16
add telemetry and logs for liquidation daemon (backport #2122) (#2123)
mergify[bot] Aug 21, 2024
b839bb3
skip liquidation task loop if last committed block height is the same…
mergify[bot] Aug 21, 2024
08be416
revert gh wf
dydxwill Aug 26, 2024
086e07c
Full node streaming remove minimum for snapshot interval flag (#2138)
jonfung-dydx Aug 22, 2024
a2bf192
Support empty params for websocket endpoint (#2111)
dydxwill Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Full node streaming] emit taker order status at end of matching loop (
jonfung-dydx committed Aug 15, 2024
commit a4bed83b9c40d14762e85203332cbdfa45596929
5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -396,6 +396,34 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills)))
}

// SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.
func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
streamTakerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
) {
clobPairId := uint32(0)
if liqOrder := streamTakerOrder.GetLiquidationOrder(); liqOrder != nil {
clobPairId = liqOrder.ClobPairId
}
if takerOrder := streamTakerOrder.GetOrder(); takerOrder != nil {
clobPairId = takerOrder.OrderId.ClobPairId
}

sm.AddUpdatesToCache(
map[uint32][]clobtypes.StreamUpdate{
clobPairId: {
{
UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{
TakerOrder: &streamTakerOrder,
},
},
},
},
1,
)
}

func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
updates []clobtypes.StreamUpdate,
clobPairIds []uint32,
7 changes: 7 additions & 0 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,13 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
) {
}

func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
blockHeight uint32,
5 changes: 5 additions & 0 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,11 @@ type FullNodeStreamingManager interface {
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)
SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
)
}

type OutgoingMessageSender interface {
15 changes: 15 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
@@ -512,3 +512,18 @@ func (f *FakeMemClobKeeper) SendOrderbookFillUpdates(
orderbookFills []types.StreamOrderbookFill,
) {
}

func (f *FakeMemClobKeeper) SendTakerOrderStatus(
ctx sdk.Context,
takerOrder types.StreamTakerOrder,
) {
}

// Placeholder to satisfy interface implementation of types.MemClobKeeper
func (f *FakeMemClobKeeper) AddOrderToOrderbookSubaccountUpdatesCheck(
ctx sdk.Context,
subaccountId satypes.SubaccountId,
order types.PendingOpenOrder,
) satypes.UpdateResult {
return satypes.Success
}
16 changes: 14 additions & 2 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
@@ -272,7 +272,7 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) {
)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
// SendOrderbookUpdates sends the offchain updates to the Full Node streaming manager.
func (k Keeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
@@ -288,7 +288,7 @@ func (k Keeper) SendOrderbookUpdates(
)
}

// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager.
// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager.
func (k Keeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
@@ -303,3 +303,15 @@ func (k Keeper) SendOrderbookFillUpdates(
k.PerpetualIdToClobPairId,
)
}

// SendTakerOrderStatus sends the taker order with its status to the Full Node streaming manager.
func (k Keeper) SendTakerOrderStatus(
ctx sdk.Context,
takerOrder types.StreamTakerOrder,
) {
k.GetFullNodeStreamingManager().SendTakerOrderStatus(
takerOrder,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
12 changes: 12 additions & 0 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
@@ -768,6 +768,18 @@ func (m *MemClobPriceTimePriority) matchOrder(
order,
)

// If full node streaming is on, emit the taker order and its resulting status.
if m.generateOrderbookUpdates {
streamTakerOrder := m.GenerateStreamTakerOrder(
order,
takerOrderStatus,
)
m.clobKeeper.SendTakerOrderStatus(
ctx,
streamTakerOrder,
)
}

// If this is a replacement order, then ensure we remove the existing order from the orderbook.
if !order.IsLiquidation() {
orderId := order.MustGetOrder().OrderId
25 changes: 25 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
@@ -157,3 +157,28 @@ func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate(
}
return offchainUpdates
}

// GenerateStreamTakerOrder returns a `StreamTakerOrder` object used in full node
// streaming from a matchableOrder and a taker order status.
func (m *MemClobPriceTimePriority) GenerateStreamTakerOrder(
takerOrder types.MatchableOrder,
takerOrderStatus types.TakerOrderStatus,
) types.StreamTakerOrder {
if takerOrder.IsLiquidation() {
liquidationOrder := takerOrder.MustGetLiquidationOrder()
streamLiquidationOrder := liquidationOrder.ToStreamLiquidationOrder()
return types.StreamTakerOrder{
TakerOrder: &types.StreamTakerOrder_LiquidationOrder{
LiquidationOrder: streamLiquidationOrder,
},
TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(),
}
}
order := takerOrder.MustGetOrder()
return types.StreamTakerOrder{
TakerOrder: &types.StreamTakerOrder_Order{
Order: &order,
},
TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(),
}
}
6 changes: 6 additions & 0 deletions protocol/x/clob/types/liquidation_order.go
Original file line number Diff line number Diff line change
@@ -115,6 +115,12 @@ func (lo *LiquidationOrder) MustGetOrder() Order {
panic("MustGetOrder: No underlying order on a LiquidationOrder type.")
}

// MustGetLiquidationOrder returns the underlying `LiquidationOrder` type.
// This function is necessary for the `LiquidationOrder` type to implement the `MatchableOrder` interface.
func (lo *LiquidationOrder) MustGetLiquidationOrder() LiquidationOrder {
return *lo
}

// MustGetLiquidatedPerpetualId returns the perpetual ID that this perpetual order is liquidating.
// This function is necessary for the `LiquidationOrder` type to implement the `MatchableOrder` interface.
func (lo *LiquidationOrder) MustGetLiquidatedPerpetualId() uint32 {
9 changes: 9 additions & 0 deletions protocol/x/clob/types/mem_clob_keeper.go
Original file line number Diff line number Diff line change
@@ -109,4 +109,13 @@ type MemClobKeeper interface {
ctx sdk.Context,
orderbookFills []StreamOrderbookFill,
)
SendTakerOrderStatus(
ctx sdk.Context,
takerOrder StreamTakerOrder,
)
AddOrderToOrderbookSubaccountUpdatesCheck(
ctx sdk.Context,
subaccountId satypes.SubaccountId,
order PendingOpenOrder,
) satypes.UpdateResult
}
6 changes: 6 additions & 0 deletions protocol/x/clob/types/order.go
Original file line number Diff line number Diff line change
@@ -132,6 +132,12 @@ func (o *Order) MustGetOrder() Order {
return *o
}

// MustGetLiquidationOrder always panics since Order is not a Liquidation Order.
// This function is necessary for the `Order` type to implement the `MatchableOrder` interface.
func (o *Order) MustGetLiquidationOrder() LiquidationOrder {
panic("MustGetLiquidationOrder: Order is not a liquidation order")
}

// MustGetLiquidatedPerpetualId always panics since there is no underlying perpetual ID for a `Order`.
// This function is necessary for the `Order` type to implement the `MatchableOrder` interface.
func (o *Order) MustGetLiquidatedPerpetualId() uint32 {
3 changes: 3 additions & 0 deletions protocol/x/clob/types/orderbook.go
Original file line number Diff line number Diff line change
@@ -257,6 +257,9 @@ type MatchableOrder interface {
// MustGetOrder returns the underlying order if this is not a liquidation order. Panics if called
// for a liquidation order.
MustGetOrder() Order
// MustGetLiquidationOrder returns the underlying liquidation order if this is not a regular order.
// Panics if called for a regular order.
MustGetLiquidationOrder() LiquidationOrder
// MustGetLiquidatedPerpetualId returns the perpetual ID if this is a liquidation order. Panics
// if called for a non-liquidation order.
MustGetLiquidatedPerpetualId() uint32