From ee909706fd3ffeb3ba7b4b3324322297bb84ac39 Mon Sep 17 00:00:00 2001 From: Teddy Ding Date: Fri, 27 Sep 2024 18:14:00 -0400 Subject: [PATCH 1/3] Internalize logic to stage FinalizeBlock events --- .../codegen/dydxprotocol/clob/streaming.ts | 16 +- proto/dydxprotocol/clob/streaming.proto | 1 + protocol/mocks/MemClobKeeper.go | 6 +- .../streaming/full_node_streaming_manager.go | 270 ++++++++++-------- protocol/streaming/noop_streaming_manager.go | 18 +- protocol/streaming/types/interface.go | 16 +- protocol/streaming/util/util.go | 8 +- protocol/testutil/memclob/keeper.go | 4 +- protocol/x/clob/keeper/keeper.go | 19 +- protocol/x/clob/keeper/process_operations.go | 18 +- protocol/x/clob/memclob/memclob.go | 2 +- protocol/x/clob/types/mem_clob_keeper.go | 4 +- protocol/x/clob/types/streaming.pb.go | 100 ++++++- protocol/x/subaccounts/keeper/subaccount.go | 2 +- 14 files changed, 297 insertions(+), 187 deletions(-) diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts index dab8f1e122..1600c2e39c 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts @@ -1,4 +1,4 @@ -import { StreamOrderbookFill, StreamOrderbookFillSDKType } from "./query"; +import { StreamOrderbookFill, StreamOrderbookFillSDKType, StreamOrderbookUpdate, StreamOrderbookUpdateSDKType } from "./query"; import { StreamSubaccountUpdate, StreamSubaccountUpdateSDKType } from "../subaccounts/streaming"; import * as _m0 from "protobufjs/minimal"; import { DeepPartial } from "../../helpers"; @@ -7,18 +7,21 @@ import { DeepPartial } from "../../helpers"; export interface StagedFinalizeBlockEvent { orderFill?: StreamOrderbookFill; subaccountUpdate?: StreamSubaccountUpdate; + orderbookUpdate?: StreamOrderbookUpdate; } /** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */ export interface StagedFinalizeBlockEventSDKType { order_fill?: StreamOrderbookFillSDKType; subaccount_update?: StreamSubaccountUpdateSDKType; + orderbook_update?: StreamOrderbookUpdateSDKType; } function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent { return { orderFill: undefined, - subaccountUpdate: undefined + subaccountUpdate: undefined, + orderbookUpdate: undefined }; } @@ -32,6 +35,10 @@ export const StagedFinalizeBlockEvent = { StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim(); } + if (message.orderbookUpdate !== undefined) { + StreamOrderbookUpdate.encode(message.orderbookUpdate, writer.uint32(26).fork()).ldelim(); + } + return writer; }, @@ -52,6 +59,10 @@ export const StagedFinalizeBlockEvent = { message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32()); break; + case 3: + message.orderbookUpdate = StreamOrderbookUpdate.decode(reader, reader.uint32()); + break; + default: reader.skipType(tag & 7); break; @@ -65,6 +76,7 @@ export const StagedFinalizeBlockEvent = { const message = createBaseStagedFinalizeBlockEvent(); message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined; message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined; + message.orderbookUpdate = object.orderbookUpdate !== undefined && object.orderbookUpdate !== null ? StreamOrderbookUpdate.fromPartial(object.orderbookUpdate) : undefined; return message; } diff --git a/proto/dydxprotocol/clob/streaming.proto b/proto/dydxprotocol/clob/streaming.proto index 06c74ffbe1..ae3811134e 100644 --- a/proto/dydxprotocol/clob/streaming.proto +++ b/proto/dydxprotocol/clob/streaming.proto @@ -12,5 +12,6 @@ message StagedFinalizeBlockEvent { oneof event { StreamOrderbookFill order_fill = 1; dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2; + StreamOrderbookUpdate orderbook_update = 3; } } diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 12a2f8cff3..5d8d68d42f 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -415,9 +415,9 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } -// SendOrderbookFillUpdates provides a mock function with given fields: ctx, orderbookFills -func (_m *MemClobKeeper) SendOrderbookFillUpdates(ctx types.Context, orderbookFills []clobtypes.StreamOrderbookFill) { - _m.Called(ctx, orderbookFills) +// SendOrderbookFillUpdate provides a mock function with given fields: ctx, orderbookFills +func (_m *MemClobKeeper) SendOrderbookFillUpdate(ctx types.Context, orderbookFill clobtypes.StreamOrderbookFill) { + _m.Called(ctx, orderbookFill) } // SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index c43a254e78..ba1d550c3d 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -20,6 +20,8 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/streaming/types" streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" ) var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil) @@ -313,10 +315,7 @@ func toOrderbookStreamUpdate( blockHeight uint32, execMode sdk.ExecMode, ) []clobtypes.StreamUpdate { - v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates) - if err != nil { - panic(err) - } + v1updates := streaming_util.GetOffchainUpdatesV1(offchainUpdates) return []clobtypes.StreamUpdate{ { UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ @@ -391,38 +390,21 @@ func getStagedEventsCount(store storetypes.KVStore) uint32 { } // Stage a subaccount update event in transient store, during `FinalizeBlock`. -func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate( +func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) { - lib.AssertDeliverTxMode(ctx) - stagedEvent := clobtypes.StagedFinalizeBlockEvent{ - Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ - SubaccountUpdate: &subaccountUpdate, - }, + // If not `DeliverTx`, return since we don't stream optimistic subaccount updates. + if !lib.IsDeliverTxMode(ctx) { + return } - sm.stageFinalizeBlockEvent( - ctx, - sm.cdc.MustMarshal(&stagedEvent), - ) -} -// Stage a fill event in transient store, during `FinalizeBlock`. -// Since `FinalizeBlock` code block can be called more than once with optimistic -// execution (once optimistically and optionally once on the canonical block), -// we need to stage the events in transient store and later emit them -// during `Precommit`. -func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, -) { - lib.AssertDeliverTxMode(ctx) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ - Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ - OrderFill: &fill, + Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ + SubaccountUpdate: &subaccountUpdate, }, } - sm.stageFinalizeBlockEvent( ctx, sm.cdc.MustMarshal(&stagedEvent), @@ -501,28 +483,37 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes. } func getStreamUpdatesFromOffchainUpdates( - offchainUpdates *clobtypes.OffchainUpdates, + v1updates []ocutypes.OffChainUpdateV1, blockHeight uint32, execMode sdk.ExecMode, ) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { // Group updates by clob pair id. - updates := make(map[uint32]*clobtypes.OffchainUpdates) - for _, message := range offchainUpdates.Messages { - clobPairId := message.OrderId.ClobPairId - if _, ok := updates[clobPairId]; !ok { - updates[clobPairId] = clobtypes.NewOffchainUpdates() + clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) + for _, v1update := range v1updates { + var clobPairId uint32 + switch u := v1update.UpdateMessage.(type) { + case *ocutypes.OffChainUpdateV1_OrderPlace: + clobPairId = u.OrderPlace.Order.OrderId.ClobPairId + case *ocutypes.OffChainUpdateV1_OrderReplace: + clobPairId = u.OrderReplace.OldOrderId.ClobPairId + case *ocutypes.OffChainUpdateV1_OrderRemove: + clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId + case *ocutypes.OffChainUpdateV1_OrderUpdate: + clobPairId = u.OrderUpdate.OrderId.ClobPairId + default: + panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u)) } - updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) + + if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { + clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} + } + clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) } // Unmarshal each per-clob pair message to v1 updates. streamUpdates = make([]clobtypes.StreamUpdate, 0) clobPairIds = make([]uint32, 0) - for clobPairId, update := range updates { - v1updates, err := streaming_util.GetOffchainUpdatesV1(update) - if err != nil { - panic(err) - } + for clobPairId, v1updates := range clobPairIdToV1Updates { streamUpdate := clobtypes.StreamUpdate{ UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ @@ -545,17 +536,36 @@ func getStreamUpdatesFromOffchainUpdates( func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookUpdatesLatency, - time.Now(), - ) + v1updates := streaming_util.GetOffchainUpdatesV1(offchainUpdates) + + // If not `DeliverTx`, then updates are optimistic. Stream them directly. + if !lib.IsDeliverTxMode(ctx) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookUpdatesLatency, + time.Now(), + ) - streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode) + streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(v1updates, blockHeight, ctx.ExecMode()) + sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + return + } - sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: false, + }, + }, + } + sm.stageFinalizeBlockEvent( + ctx, + sm.cdc.MustMarshal(&stagedEvent), + ) } func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( @@ -595,36 +605,54 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( return streamUpdates, clobPairIds } -// SendOrderbookFillUpdates groups fills by their clob pair ids and +// SendOrderbookFillUpdate groups fills by their clob pair ids and // sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, +func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookFillsLatency, - time.Now(), - ) + // If not `DeliverTx`, then updates are optimistic. Stream them directly. + if !lib.IsDeliverTxMode(ctx) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookFillsLatency, + time.Now(), + ) - streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( - orderbookFills, - blockHeight, - execMode, - perpetualIdToClobPairId, - ) + streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( + []clobtypes.StreamOrderbookFill{orderbookFill}, + blockHeight, + ctx.ExecMode(), + perpetualIdToClobPairId, + ) + sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + return + } + + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ + OrderFill: &orderbookFill, + }, + } - sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + sm.stageFinalizeBlockEvent( + ctx, + sm.cdc.MustMarshal(&stagedEvent), + ) } // SendTakerOrderStatus sends out a taker order and its status to the full node streaming service. func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( streamTakerOrder clobtypes.StreamTakerOrder, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { + // In current design, we never send this during `DeliverTx` (`FinalizeBlock`). + lib.AssertCheckTxMode(ctx) + clobPairId := uint32(0) if liqOrder := streamTakerOrder.GetLiquidationOrder(); liqOrder != nil { clobPairId = liqOrder.ClobPairId @@ -640,7 +668,7 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( TakerOrder: &streamTakerOrder, }, BlockHeight: blockHeight, - ExecMode: uint32(execMode), + ExecMode: uint32(ctx.ExecMode()), }, }, []uint32{clobPairId}, @@ -712,13 +740,7 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( float32(len(updates)), ) - sm.streamUpdateCache = append(sm.streamUpdateCache, updates...) - for _, clobPairId := range clobPairIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.clobPairIdToSubscriptionIdMapping[clobPairId], - ) - } + sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds) // Remove all subscriptions and wipe the buffer if buffer overflows. sm.RemoveSubscriptionsAndClearBufferIfFull() @@ -739,13 +761,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( float32(len(updates)), ) - sm.streamUpdateCache = append(sm.streamUpdateCache, updates...) - for _, subaccountId := range subaccountIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.subaccountIdToSubscriptionIdMapping[*subaccountId], - ) - } + sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds) + sm.RemoveSubscriptionsAndClearBufferIfFull() sm.EmitMetrics() } @@ -850,38 +867,31 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( return ret } -// addBatchUpdatesToCacheWithLock adds batched updates to the cache. -// Used by `StreamBatchUpdatesAfterFinalizeBlock` to batch orderbook, fill -// and subaccount updates in a single stream. -// Note this method requires the lock and assumes that the lock has already been +// cacheStreamUpdatesByClobPairWithLock adds stream updates to cache, +// and store corresponding clob pair Ids. +// This method requires the lock and assumes that the lock has already been // acquired by the caller. -func (sm *FullNodeStreamingManagerImpl) addBatchUpdatesToCacheWithLock( - orderbookStreamUpdates []clobtypes.StreamUpdate, - orderbookClobPairIds []uint32, - fillStreamUpdates []clobtypes.StreamUpdate, - fillClobPairIds []uint32, - subaccountStreamUpdates []clobtypes.StreamUpdate, - subaccountIds []*satypes.SubaccountId, +func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByClobPairWithLock( + streamUpdates []clobtypes.StreamUpdate, + clobPairIds []uint32, ) { - // Add orderbook updates to cache. - sm.streamUpdateCache = append(sm.streamUpdateCache, orderbookStreamUpdates...) - for _, clobPairId := range orderbookClobPairIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.clobPairIdToSubscriptionIdMapping[clobPairId], - ) - } - - // Add fill updates to cache. - sm.streamUpdateCache = append(sm.streamUpdateCache, fillStreamUpdates...) - for _, clobPairId := range fillClobPairIds { + sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...) + for _, clobPairId := range clobPairIds { sm.streamUpdateSubscriptionCache = append( sm.streamUpdateSubscriptionCache, sm.clobPairIdToSubscriptionIdMapping[clobPairId], ) } +} - // Add subaccount updates to cache. +// cacheStreamUpdatesBySubaccountWithLock adds subaccount stream updates to cache, +// and store corresponding subaccount Ids. +// This method requires the lock and assumes that the lock has already been +// acquired by the caller. +func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesBySubaccountWithLock( + subaccountStreamUpdates []clobtypes.StreamUpdate, + subaccountIds []*satypes.SubaccountId, +) { sm.streamUpdateCache = append(sm.streamUpdateCache, subaccountStreamUpdates...) for _, subaccountId := range subaccountIds { sm.streamUpdateSubscriptionCache = append( @@ -902,41 +912,50 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( // Prevent gas metering from state read. ctx = ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) - finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) + finalizedFills, + finalizedSubaccountUpdates, + finalizedOrderbookUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) + + sm.Lock() + defer sm.Unlock() + + // Flush all pending updates, since we want the onchain updates to arrive in a batch. + sm.FlushStreamUpdatesWithLock() - orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates( - orderBookUpdatesToSyncLocalOpsQueue, + // Cache updates to sync local ops queue + sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( + streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), uint32(ctx.BlockHeight()), ctx.ExecMode(), ) + sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds) + // Cache updates for finalized fills. fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( finalizedFills, uint32(ctx.BlockHeight()), ctx.ExecMode(), perpetualIdToClobPairId, ) + sm.cacheStreamUpdatesByClobPairWithLock(fillStreamUpdates, fillClobPairIds) + + // Cache updates for finalized orderbook updates (e.g. RemoveOrderFillAmount in `EndBlocker`). + for _, finalizedUpdate := range finalizedOrderbookUpdates { + streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates( + finalizedUpdate.Updates, + uint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) + sm.cacheStreamUpdatesByClobPairWithLock(streamUpdates, clobPairIds) + } + // Finally, cache updates for finalized subaccount updates subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( finalizedSubaccountUpdates, uint32(ctx.BlockHeight()), ctx.ExecMode(), ) - - sm.Lock() - defer sm.Unlock() - - // Flush all pending updates, since we want the onchain updates to arrive in a batch. - sm.FlushStreamUpdatesWithLock() - - sm.addBatchUpdatesToCacheWithLock( - orderbookStreamUpdates, - orderbookClobPairIds, - fillStreamUpdates, - fillClobPairIds, - subaccountStreamUpdates, - subaccountIds, - ) + sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds) // Emit all stream updates in a single batch. // Note we still have the lock, which is released right before function returns. @@ -950,6 +969,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( ) ( finalizedFills []clobtypes.StreamOrderbookFill, finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate, + finalizedOrderbookUpdates []clobtypes.StreamOrderbookUpdate, ) { // Get onchain stream events stored in transient store. stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx) @@ -965,6 +985,10 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( finalizedFills = append(finalizedFills, *event.OrderFill) case *clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate: finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate) + case *clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate: + finalizedOrderbookUpdates = append(finalizedOrderbookUpdates, *event.OrderbookUpdate) + default: + panic(fmt.Sprintf("Unhandled staged event type: %v\n", stagedEvent.Event)) } } @@ -977,7 +1001,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( float32(len(finalizedFills)), ) - return finalizedFills, finalizedSubaccountUpdates + return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates } func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 4df60bc427..5358b9b098 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -32,14 +32,14 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { } -func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, +func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { } @@ -47,7 +47,7 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { } @@ -79,19 +79,13 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams( func (sm *NoopGrpcStreamingManager) Stop() { } -func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, -) { -} - func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents( ctx sdk.Context, ) []clobtypes.StagedFinalizeBlockEvent { return nil } -func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate( +func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) { diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 0f097d3e75..cddaada7d7 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -32,29 +32,25 @@ type FullNodeStreamingManager interface { SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) - SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, + SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) SendFinalizedSubaccountUpdates( subaccountUpdates []satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) - StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, - ) - StageFinalizeBlockSubaccountUpdate( + SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) diff --git a/protocol/streaming/util/util.go b/protocol/streaming/util/util.go index 985a29ef33..bbf37e3340 100644 --- a/protocol/streaming/util/util.go +++ b/protocol/streaming/util/util.go @@ -1,21 +1,23 @@ package util import ( + "fmt" + "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) // GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1. -func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) { +func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes.OffChainUpdateV1 { v1updates := make([]ocutypes.OffChainUpdateV1, 0) for _, message := range offchainUpdates.Messages { var update ocutypes.OffChainUpdateV1 err := proto.Unmarshal(message.Message.Value, &update) if err != nil { - return nil, err + panic(fmt.Sprintf("Failed to get OffchainUpdatesV1: %v", err)) } v1updates = append(v1updates, update) } - return v1updates, nil + return v1updates } diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 22ba76acdd..376f6fb30a 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -508,9 +508,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates( ) { } -func (f *FakeMemClobKeeper) SendOrderbookFillUpdates( +func (f *FakeMemClobKeeper) SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []types.StreamOrderbookFill, + orderbookFill types.StreamOrderbookFill, ) { } diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 8c39342ca6..5ceaad43a8 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -310,22 +310,19 @@ func (k Keeper) SendOrderbookUpdates( k.GetFullNodeStreamingManager().SendOrderbookUpdates( offchainUpdates, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), + ctx, ) } -// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager. -func (k Keeper) SendOrderbookFillUpdates( +// SendOrderbookFillUpdate sends the orderbook fills to the Full Node streaming manager. +func (k Keeper) SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []types.StreamOrderbookFill, + orderbookFill types.StreamOrderbookFill, ) { - if len(orderbookFills) == 0 { - return - } - k.GetFullNodeStreamingManager().SendOrderbookFillUpdates( - orderbookFills, + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( + orderbookFill, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), + ctx, k.PerpetualIdToClobPairId, ) } @@ -338,6 +335,6 @@ func (k Keeper) SendTakerOrderStatus( k.GetFullNodeStreamingManager().SendTakerOrderStatus( takerOrder, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), + ctx, ) } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index 026d9d316c..ad1c117eb7 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -560,9 +560,11 @@ func (k Keeper) PersistMatchOrdersToState( makerOrders, ) - k.GetFullNodeStreamingManager().StageFinalizeBlockFill( - ctx, + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, + uint32(ctx.BlockHeight()), + ctx, + k.PerpetualIdToClobPairId, ) } @@ -670,9 +672,11 @@ func (k Keeper) PersistMatchLiquidationToState( takerOrder, makerOrders, ) - k.GetFullNodeStreamingManager().StageFinalizeBlockFill( - ctx, + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, + uint32(ctx.BlockHeight()), + ctx, + k.PerpetualIdToClobPairId, ) } return nil @@ -843,11 +847,9 @@ func (k Keeper) PersistMatchDeleveragingToState( }, }, } - k.SendOrderbookFillUpdates( + k.SendOrderbookFillUpdate( ctx, - []types.StreamOrderbookFill{ - streamOrderbookFill, - }, + streamOrderbookFill, ) } } diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index d6d1e08774..db541650c3 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -402,7 +402,7 @@ func (m *MemClobPriceTimePriority) mustUpdateMemclobStateWithMatches( ) clobMatch := internalOperation.GetMatch() orderbookMatchFill := m.GenerateStreamOrderbookFill(ctx, *clobMatch, takerOrder, makerOrders) - m.clobKeeper.SendOrderbookFillUpdates(ctx, []types.StreamOrderbookFill{orderbookMatchFill}) + m.clobKeeper.SendOrderbookFillUpdate(ctx, orderbookMatchFill) } // Build a slice of all subaccounts which had matches this matching loop, and sort them for determinism. diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index d555367718..6e25cadf35 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -102,9 +102,9 @@ type MemClobKeeper interface { ctx sdk.Context, offchainUpdates *OffchainUpdates, ) - SendOrderbookFillUpdates( + SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []StreamOrderbookFill, + orderbookFill StreamOrderbookFill, ) SendTakerOrderStatus( ctx sdk.Context, diff --git a/protocol/x/clob/types/streaming.pb.go b/protocol/x/clob/types/streaming.pb.go index 83b8db719a..1f6f552fb3 100644 --- a/protocol/x/clob/types/streaming.pb.go +++ b/protocol/x/clob/types/streaming.pb.go @@ -30,6 +30,7 @@ type StagedFinalizeBlockEvent struct { // Types that are valid to be assigned to Event: // *StagedFinalizeBlockEvent_OrderFill // *StagedFinalizeBlockEvent_SubaccountUpdate + // *StagedFinalizeBlockEvent_OrderbookUpdate Event isStagedFinalizeBlockEvent_Event `protobuf_oneof:"event"` } @@ -78,9 +79,13 @@ type StagedFinalizeBlockEvent_OrderFill struct { type StagedFinalizeBlockEvent_SubaccountUpdate struct { SubaccountUpdate *types.StreamSubaccountUpdate `protobuf:"bytes,2,opt,name=subaccount_update,json=subaccountUpdate,proto3,oneof" json:"subaccount_update,omitempty"` } +type StagedFinalizeBlockEvent_OrderbookUpdate struct { + OrderbookUpdate *StreamOrderbookUpdate `protobuf:"bytes,3,opt,name=orderbook_update,json=orderbookUpdate,proto3,oneof" json:"orderbook_update,omitempty"` +} func (*StagedFinalizeBlockEvent_OrderFill) isStagedFinalizeBlockEvent_Event() {} func (*StagedFinalizeBlockEvent_SubaccountUpdate) isStagedFinalizeBlockEvent_Event() {} +func (*StagedFinalizeBlockEvent_OrderbookUpdate) isStagedFinalizeBlockEvent_Event() {} func (m *StagedFinalizeBlockEvent) GetEvent() isStagedFinalizeBlockEvent_Event { if m != nil { @@ -103,11 +108,19 @@ func (m *StagedFinalizeBlockEvent) GetSubaccountUpdate() *types.StreamSubaccount return nil } +func (m *StagedFinalizeBlockEvent) GetOrderbookUpdate() *StreamOrderbookUpdate { + if x, ok := m.GetEvent().(*StagedFinalizeBlockEvent_OrderbookUpdate); ok { + return x.OrderbookUpdate + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*StagedFinalizeBlockEvent) XXX_OneofWrappers() []interface{} { return []interface{}{ (*StagedFinalizeBlockEvent_OrderFill)(nil), (*StagedFinalizeBlockEvent_SubaccountUpdate)(nil), + (*StagedFinalizeBlockEvent_OrderbookUpdate)(nil), } } @@ -118,25 +131,26 @@ func init() { func init() { proto.RegisterFile("dydxprotocol/clob/streaming.proto", fileDescriptor_cecf6ffcf2554dee) } var fileDescriptor_cecf6ffcf2554dee = []byte{ - // 281 bytes of a gzipped FileDescriptorProto + // 303 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x4c, 0xa9, 0x4c, 0xa9, 0x28, 0x28, 0xca, 0x2f, 0xc9, 0x4f, 0xce, 0xcf, 0xd1, 0x4f, 0xce, 0xc9, 0x4f, 0xd2, 0x2f, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0xcd, 0xcc, 0x4b, 0xd7, 0x03, 0x8b, 0x0b, 0x09, 0x22, 0x2b, 0xd1, 0x03, 0x29, 0x91, 0xd2, 0x40, 0xd1, 0x55, 0x5c, 0x9a, 0x94, 0x98, 0x9c, 0x9c, 0x5f, 0x9a, 0x57, 0x52, - 0x8c, 0xae, 0x59, 0x4a, 0x16, 0xd3, 0xfc, 0xc2, 0xd2, 0xd4, 0xa2, 0x4a, 0x88, 0xb4, 0xd2, 0x59, - 0x46, 0x2e, 0x89, 0xe0, 0x92, 0xc4, 0xf4, 0xd4, 0x14, 0xb7, 0xcc, 0xbc, 0xc4, 0x9c, 0xcc, 0xaa, + 0x8c, 0xae, 0x59, 0x4a, 0x16, 0xd3, 0xfc, 0xc2, 0xd2, 0xd4, 0xa2, 0x4a, 0x88, 0xb4, 0xd2, 0x12, + 0x26, 0x2e, 0x89, 0xe0, 0x92, 0xc4, 0xf4, 0xd4, 0x14, 0xb7, 0xcc, 0xbc, 0xc4, 0x9c, 0xcc, 0xaa, 0x54, 0xa7, 0x9c, 0xfc, 0xe4, 0x6c, 0xd7, 0xb2, 0xd4, 0xbc, 0x12, 0x21, 0x77, 0x2e, 0xae, 0xfc, 0xa2, 0x94, 0xd4, 0xa2, 0xf8, 0xb4, 0xcc, 0x9c, 0x1c, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x6e, 0x23, 0x35, 0x3d, 0x0c, 0xd7, 0xe8, 0x05, 0x83, 0xed, 0xf4, 0x07, 0x29, 0x4d, 0xca, 0xcf, 0xcf, 0x76, 0xcb, 0xcc, 0xc9, 0xf1, 0x60, 0x08, 0xe2, 0x04, 0xeb, 0x05, 0x71, 0x84, 0xe2, 0xb9, 0x04, 0x11, 0x6e, 0x8c, 0x2f, 0x2d, 0x48, 0x49, 0x2c, 0x49, 0x95, 0x60, 0x02, 0x9b, 0x67, 0x80, 0x6a, 0x1e, 0x92, 0x57, 0xa0, 0xc6, 0x06, 0xc3, 0x45, 0x42, 0xc1, 0xfa, 0x3c, 0x18, 0x82, 0x04, 0x8a, 0xd1, - 0xc4, 0x9c, 0xd8, 0xb9, 0x58, 0x53, 0x41, 0x4e, 0x76, 0x0a, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, - 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, - 0xc6, 0x63, 0x39, 0x86, 0x28, 0xb3, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, - 0x7d, 0x94, 0x30, 0x29, 0x33, 0xd1, 0x4d, 0xce, 0x48, 0xcc, 0xcc, 0xd3, 0x87, 0x8b, 0x54, 0x40, - 0xc2, 0xa9, 0xa4, 0xb2, 0x20, 0xb5, 0x38, 0x89, 0x0d, 0x2c, 0x6c, 0x0c, 0x08, 0x00, 0x00, 0xff, - 0xff, 0x65, 0x71, 0xd8, 0xa8, 0xa9, 0x01, 0x00, 0x00, + 0xc4, 0x84, 0x42, 0xb9, 0x04, 0xf2, 0x61, 0xd6, 0xc3, 0xcc, 0x67, 0x06, 0x9b, 0xaf, 0x41, 0xd8, + 0xbd, 0x70, 0x73, 0xf9, 0xf3, 0x51, 0x85, 0x9c, 0xd8, 0xb9, 0x58, 0x53, 0x41, 0x21, 0xe1, 0x14, + 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, + 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, 0x0c, 0x51, 0x66, 0xe9, 0x99, 0x25, 0x19, + 0xa5, 0x49, 0x7a, 0xc9, 0xf9, 0xb9, 0xfa, 0x28, 0x41, 0x5d, 0x66, 0xa2, 0x9b, 0x9c, 0x91, 0x98, + 0x99, 0xa7, 0x0f, 0x17, 0xa9, 0x80, 0x04, 0x7f, 0x49, 0x65, 0x41, 0x6a, 0x71, 0x12, 0x1b, 0x58, + 0xd8, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xc0, 0xbc, 0x6c, 0x00, 0x02, 0x00, 0x00, } func (m *StagedFinalizeBlockEvent) Marshal() (dAtA []byte, err error) { @@ -213,6 +227,27 @@ func (m *StagedFinalizeBlockEvent_SubaccountUpdate) MarshalToSizedBuffer(dAtA [] } return len(dAtA) - i, nil } +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.OrderbookUpdate != nil { + { + size, err := m.OrderbookUpdate.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStreaming(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + return len(dAtA) - i, nil +} func encodeVarintStreaming(dAtA []byte, offset int, v uint64) int { offset -= sovStreaming(v) base := offset @@ -260,6 +295,18 @@ func (m *StagedFinalizeBlockEvent_SubaccountUpdate) Size() (n int) { } return n } +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.OrderbookUpdate != nil { + l = m.OrderbookUpdate.Size() + n += 1 + l + sovStreaming(uint64(l)) + } + return n +} func sovStreaming(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 @@ -366,6 +413,41 @@ func (m *StagedFinalizeBlockEvent) Unmarshal(dAtA []byte) error { } m.Event = &StagedFinalizeBlockEvent_SubaccountUpdate{v} iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field OrderbookUpdate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStreaming + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStreaming + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStreaming + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &StreamOrderbookUpdate{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &StagedFinalizeBlockEvent_OrderbookUpdate{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipStreaming(dAtA[iNdEx:]) diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index 5eb650cce0..e72ccd60e7 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -445,7 +445,7 @@ func (k Keeper) UpdateSubaccounts( if lib.IsDeliverTxMode(ctx) && k.GetFullNodeStreamingManager().Enabled() { if k.GetFullNodeStreamingManager().TracksSubaccountId(*u.SettledSubaccount.Id) { subaccountUpdate := GenerateStreamSubaccountUpdate(u, fundingPayments) - k.GetFullNodeStreamingManager().StageFinalizeBlockSubaccountUpdate( + k.GetFullNodeStreamingManager().SendSubaccountUpdate( ctx, subaccountUpdate, ) From 1fb2b957dbab0ad0510ae0f82faae986fc1c1618 Mon Sep 17 00:00:00 2001 From: Teddy Ding Date: Mon, 30 Sep 2024 13:15:17 -0400 Subject: [PATCH 2/3] Address comments --- .../streaming/full_node_streaming_manager.go | 47 ++++++++++++------- protocol/streaming/noop_streaming_manager.go | 3 -- protocol/streaming/types/interface.go | 3 -- protocol/x/clob/keeper/keeper.go | 3 -- protocol/x/clob/keeper/process_operations.go | 2 - 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index ba1d550c3d..759b955c55 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -389,7 +389,7 @@ func getStagedEventsCount(store storetypes.KVStore) uint32 { return binary.BigEndian.Uint32(countsBytes) } -// Stage a subaccount update event in transient store, during `FinalizeBlock`. +// Send a subaccount update event. func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, @@ -399,7 +399,7 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( return } - // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ SubaccountUpdate: &subaccountUpdate, @@ -489,6 +489,8 @@ func getStreamUpdatesFromOffchainUpdates( ) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { // Group updates by clob pair id. clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) + // unique list of clob pair Ids to send updates for. + clobPairIds = make([]uint32, 0) for _, v1update := range v1updates { var clobPairId uint32 switch u := v1update.UpdateMessage.(type) { @@ -506,14 +508,23 @@ func getStreamUpdatesFromOffchainUpdates( if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} + clobPairIds = append(clobPairIds, clobPairId) } clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) } // Unmarshal each per-clob pair message to v1 updates. - streamUpdates = make([]clobtypes.StreamUpdate, 0) - clobPairIds = make([]uint32, 0) - for clobPairId, v1updates := range clobPairIdToV1Updates { + streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds)) + + for _, clobPairId := range clobPairIds { + v1updates, exists := clobPairIdToV1Updates[clobPairId] + if !exists { + panic(fmt.Sprintf( + "clob pair id %v not found in clobPairIdToV1Updates: %v", + clobPairId, + clobPairIdToV1Updates, + )) + } streamUpdate := clobtypes.StreamUpdate{ UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ @@ -525,7 +536,6 @@ func getStreamUpdatesFromOffchainUpdates( ExecMode: uint32(execMode), } streamUpdates = append(streamUpdates, streamUpdate) - clobPairIds = append(clobPairIds, clobPairId) } return streamUpdates, clobPairIds @@ -535,7 +545,6 @@ func getStreamUpdatesFromOffchainUpdates( // sends messages to the subscribers. func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, - blockHeight uint32, ctx sdk.Context, ) { v1updates := streaming_util.GetOffchainUpdatesV1(offchainUpdates) @@ -548,12 +557,16 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( time.Now(), ) - streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(v1updates, blockHeight, ctx.ExecMode()) + streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates( + v1updates, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) return } - // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ @@ -609,7 +622,6 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( // sends messages to the subscribers. func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, - blockHeight uint32, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { @@ -623,7 +635,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( []clobtypes.StreamOrderbookFill{orderbookFill}, - blockHeight, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), perpetualIdToClobPairId, ) @@ -631,7 +643,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( return } - // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ OrderFill: &orderbookFill, @@ -647,7 +659,6 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( // SendTakerOrderStatus sends out a taker order and its status to the full node streaming service. func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( streamTakerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, ctx sdk.Context, ) { // In current design, we never send this during `DeliverTx` (`FinalizeBlock`). @@ -667,7 +678,7 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ TakerOrder: &streamTakerOrder, }, - BlockHeight: blockHeight, + BlockHeight: lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ExecMode: uint32(ctx.ExecMode()), }, }, @@ -925,7 +936,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( // Cache updates to sync local ops queue sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds) @@ -933,7 +944,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( // Cache updates for finalized fills. fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( finalizedFills, - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), perpetualIdToClobPairId, ) @@ -943,7 +954,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( for _, finalizedUpdate := range finalizedOrderbookUpdates { streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates( finalizedUpdate.Updates, - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) sm.cacheStreamUpdatesByClobPairWithLock(streamUpdates, clobPairIds) @@ -952,7 +963,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( // Finally, cache updates for finalized subaccount updates subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( finalizedSubaccountUpdates, - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds) diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 5358b9b098..9dc7bf6de9 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -31,14 +31,12 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, - blockHeight uint32, ctx sdk.Context, ) { } func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, - blockHeight uint32, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { @@ -46,7 +44,6 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate( func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, ctx sdk.Context, ) { } diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index cddaada7d7..5b42864016 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -31,18 +31,15 @@ type FullNodeStreamingManager interface { ) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, - blockHeight uint32, ctx sdk.Context, ) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, - blockHeight uint32, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, ctx sdk.Context, ) SendFinalizedSubaccountUpdates( diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 5ceaad43a8..f49eb61271 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -309,7 +309,6 @@ func (k Keeper) SendOrderbookUpdates( k.GetFullNodeStreamingManager().SendOrderbookUpdates( offchainUpdates, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx, ) } @@ -321,7 +320,6 @@ func (k Keeper) SendOrderbookFillUpdate( ) { k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( orderbookFill, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId, ) @@ -334,7 +332,6 @@ func (k Keeper) SendTakerOrderStatus( ) { k.GetFullNodeStreamingManager().SendTakerOrderStatus( takerOrder, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx, ) } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index ad1c117eb7..86808afb5d 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -562,7 +562,6 @@ func (k Keeper) PersistMatchOrdersToState( k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, - uint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId, ) @@ -674,7 +673,6 @@ func (k Keeper) PersistMatchLiquidationToState( ) k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, - uint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId, ) From 804b7d2b3d054ffbb42456e057fc53200509dec0 Mon Sep 17 00:00:00 2001 From: Teddy Ding Date: Mon, 30 Sep 2024 14:12:12 -0400 Subject: [PATCH 3/3] Fix list append bug --- protocol/streaming/full_node_streaming_manager.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 759b955c55..85c265f12e 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -516,7 +516,7 @@ func getStreamUpdatesFromOffchainUpdates( // Unmarshal each per-clob pair message to v1 updates. streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds)) - for _, clobPairId := range clobPairIds { + for i, clobPairId := range clobPairIds { v1updates, exists := clobPairIdToV1Updates[clobPairId] if !exists { panic(fmt.Sprintf( @@ -525,7 +525,7 @@ func getStreamUpdatesFromOffchainUpdates( clobPairIdToV1Updates, )) } - streamUpdate := clobtypes.StreamUpdate{ + streamUpdates[i] = clobtypes.StreamUpdate{ UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ Updates: v1updates, @@ -535,7 +535,6 @@ func getStreamUpdatesFromOffchainUpdates( BlockHeight: blockHeight, ExecMode: uint32(execMode), } - streamUpdates = append(streamUpdates, streamUpdate) } return streamUpdates, clobPairIds