Skip to content

Commit

Permalink
[Full node streaming] emit taker order status at end of matching loop (
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx authored Aug 8, 2024
1 parent ac64d4a commit 2d5dfa5
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 2 deletions.
5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,34 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills)))
}

// SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.
func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
streamTakerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
) {
clobPairId := uint32(0)
if liqOrder := streamTakerOrder.GetLiquidationOrder(); liqOrder != nil {
clobPairId = liqOrder.ClobPairId
}
if takerOrder := streamTakerOrder.GetOrder(); takerOrder != nil {
clobPairId = takerOrder.OrderId.ClobPairId
}

sm.AddUpdatesToCache(
map[uint32][]clobtypes.StreamUpdate{
clobPairId: {
{
UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{
TakerOrder: &streamTakerOrder,
},
},
},
},
1,
)
}

func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
updates []clobtypes.StreamUpdate,
clobPairIds []uint32,
Expand Down
7 changes: 7 additions & 0 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
) {
}

func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
blockHeight uint32,
Expand Down
5 changes: 5 additions & 0 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type FullNodeStreamingManager interface {
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)
SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
)
}

type OutgoingMessageSender interface {
Expand Down
6 changes: 6 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ func (f *FakeMemClobKeeper) SendOrderbookFillUpdates(
) {
}

func (f *FakeMemClobKeeper) SendTakerOrderStatus(
ctx sdk.Context,
takerOrder types.StreamTakerOrder,
) {
}

// Placeholder to satisfy interface implementation of types.MemClobKeeper
func (f *FakeMemClobKeeper) AddOrderToOrderbookSubaccountUpdatesCheck(
ctx sdk.Context,
Expand Down
16 changes: 14 additions & 2 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) {
)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
// SendOrderbookUpdates sends the offchain updates to the Full Node streaming manager.
func (k Keeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
Expand All @@ -283,7 +283,7 @@ func (k Keeper) SendOrderbookUpdates(
)
}

// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager.
// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager.
func (k Keeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
Expand All @@ -298,3 +298,15 @@ func (k Keeper) SendOrderbookFillUpdates(
k.PerpetualIdToClobPairId,
)
}

// SendTakerOrderStatus sends the taker order with its status to the Full Node streaming manager.
func (k Keeper) SendTakerOrderStatus(
ctx sdk.Context,
takerOrder types.StreamTakerOrder,
) {
k.GetFullNodeStreamingManager().SendTakerOrderStatus(
takerOrder,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
12 changes: 12 additions & 0 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,18 @@ func (m *MemClobPriceTimePriority) matchOrder(
order,
)

// If full node streaming is on, emit the taker order and its resulting status.
if m.generateOrderbookUpdates {
streamTakerOrder := m.GenerateStreamTakerOrder(
order,
takerOrderStatus,
)
m.clobKeeper.SendTakerOrderStatus(
ctx,
streamTakerOrder,
)
}

// If this is a replacement order, then ensure we remove the existing order from the orderbook.
if !order.IsLiquidation() {
orderId := order.MustGetOrder().OrderId
Expand Down
25 changes: 25 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,28 @@ func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate(
}
return offchainUpdates
}

// GenerateStreamTakerOrder returns a `StreamTakerOrder` object used in full node
// streaming from a matchableOrder and a taker order status.
func (m *MemClobPriceTimePriority) GenerateStreamTakerOrder(
takerOrder types.MatchableOrder,
takerOrderStatus types.TakerOrderStatus,
) types.StreamTakerOrder {
if takerOrder.IsLiquidation() {
liquidationOrder := takerOrder.MustGetLiquidationOrder()
streamLiquidationOrder := liquidationOrder.ToStreamLiquidationOrder()
return types.StreamTakerOrder{
TakerOrder: &types.StreamTakerOrder_LiquidationOrder{
LiquidationOrder: streamLiquidationOrder,
},
TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(),
}
}
order := takerOrder.MustGetOrder()
return types.StreamTakerOrder{
TakerOrder: &types.StreamTakerOrder_Order{
Order: &order,
},
TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(),
}
}
6 changes: 6 additions & 0 deletions protocol/x/clob/types/liquidation_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ func (lo *LiquidationOrder) MustGetOrder() Order {
panic("MustGetOrder: No underlying order on a LiquidationOrder type.")
}

// MustGetLiquidationOrder returns the underlying `LiquidationOrder` type.
// This function is necessary for the `LiquidationOrder` type to implement the `MatchableOrder` interface.
func (lo *LiquidationOrder) MustGetLiquidationOrder() LiquidationOrder {
return *lo
}

// MustGetLiquidatedPerpetualId returns the perpetual ID that this perpetual order is liquidating.
// This function is necessary for the `LiquidationOrder` type to implement the `MatchableOrder` interface.
func (lo *LiquidationOrder) MustGetLiquidatedPerpetualId() uint32 {
Expand Down
4 changes: 4 additions & 0 deletions protocol/x/clob/types/mem_clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ type MemClobKeeper interface {
ctx sdk.Context,
orderbookFills []StreamOrderbookFill,
)
SendTakerOrderStatus(
ctx sdk.Context,
takerOrder StreamTakerOrder,
)
AddOrderToOrderbookSubaccountUpdatesCheck(
ctx sdk.Context,
subaccountId satypes.SubaccountId,
Expand Down
6 changes: 6 additions & 0 deletions protocol/x/clob/types/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func (o *Order) MustGetOrder() Order {
return *o
}

// MustGetLiquidationOrder always panics since Order is not a Liquidation Order.
// This function is necessary for the `Order` type to implement the `MatchableOrder` interface.
func (o *Order) MustGetLiquidationOrder() LiquidationOrder {
panic("MustGetLiquidationOrder: Order is not a liquidation order")
}

// MustGetLiquidatedPerpetualId always panics since there is no underlying perpetual ID for a `Order`.
// This function is necessary for the `Order` type to implement the `MatchableOrder` interface.
func (o *Order) MustGetLiquidatedPerpetualId() uint32 {
Expand Down
3 changes: 3 additions & 0 deletions protocol/x/clob/types/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ type MatchableOrder interface {
// MustGetOrder returns the underlying order if this is not a liquidation order. Panics if called
// for a liquidation order.
MustGetOrder() Order
// MustGetLiquidationOrder returns the underlying liquidation order if this is not a regular order.
// Panics if called for a regular order.
MustGetLiquidationOrder() LiquidationOrder
// MustGetLiquidatedPerpetualId returns the perpetual ID if this is a liquidation order. Panics
// if called for a non-liquidation order.
MustGetLiquidatedPerpetualId() uint32
Expand Down

0 comments on commit 2d5dfa5

Please sign in to comment.