From 39213e7c20ea9656f4bddb1d7c305fb365f9f113 Mon Sep 17 00:00:00 2001 From: Matthew Weeks Date: Thu, 9 Jan 2025 17:34:37 -0500 Subject: [PATCH] Full Node Streaming Order Filter utils and tests --- .../streaming/full_node_streaming_manager.go | 89 ++++-- .../full_node_streaming_manager_test.go | 288 ++++++++++++++++++ protocol/streaming/util/util.go | 18 +- 3 files changed, 356 insertions(+), 39 deletions(-) create mode 100644 protocol/streaming/full_node_streaming_manager_test.go diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index ea1bb738af..be7962f230 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -7,24 +7,20 @@ import ( "sync/atomic" "time" - "github.com/dydxprotocol/v4-chain/protocol/lib" - pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" - satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" - "cosmossdk.io/log" storetypes "cosmossdk.io/store/types" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types" + "github.com/dydxprotocol/v4-chain/protocol/finalizeblock" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "github.com/dydxprotocol/v4-chain/protocol/streaming/types" - "github.com/dydxprotocol/v4-chain/protocol/streaming/util" streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" - - ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" - - "github.com/dydxprotocol/v4-chain/protocol/finalizeblock" + pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil) @@ -96,9 +92,41 @@ type OrderbookSubscription struct { // If interval snapshots are turned on, the next block height at which // a snapshot should be sent out. nextSnapshotBlock uint32 +} + +func NewOrderbookSubscription( + subscriptionId uint32, + clobPairIds []uint32, + subaccountIds []satypes.SubaccountId, + marketIds []uint32, + messageSender types.OutgoingMessageSender, + updatesChannel chan []clobtypes.StreamUpdate, +) *OrderbookSubscription { + return &OrderbookSubscription{ + subscriptionId: subscriptionId, + initialized: &atomic.Bool{}, // False by default. + clobPairIds: clobPairIds, + subaccountIds: subaccountIds, + marketIds: marketIds, + messageSender: messageSender, + updatesChannel: updatesChannel, + } +} - // Filter orders for subaccountIds - filterOrders bool +func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription( + clobPairIds []uint32, + subaccountIds []satypes.SubaccountId, + marketIds []uint32, + messageSender types.OutgoingMessageSender, +) *OrderbookSubscription { + return NewOrderbookSubscription( + sm.getNextAvailableSubscriptionId(), + clobPairIds, + subaccountIds, + marketIds, + messageSender, + make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), + ) } func (sub *OrderbookSubscription) IsInitialized() bool { @@ -196,7 +224,10 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 // If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message // If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new // StreamUpdate_OrderUpdate with updates only for subscribed subaccounts -func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(output chan []clobtypes.StreamUpdate, logger log.Logger) { +func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates( + output chan []clobtypes.StreamUpdate, + logger log.Logger, +) { subaccountIdNumbers := make([]uint32, len(sub.subaccountIds)) for i, subaccountId := range sub.subaccountIds { subaccountIdNumbers[i] = subaccountId.Number @@ -210,8 +241,8 @@ func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(output chan []cl case *clobtypes.StreamUpdate_OrderbookUpdate: orderBookUpdates := []ocutypes.OffChainUpdateV1{} for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates { - orderBookUpdateSubaccountIdNumber, err := util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate) - if err != nil { + orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate) + if err == nil { if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) { orderBookUpdates = append(orderBookUpdates, orderBookUpdate) } @@ -266,17 +297,8 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( sIds[i] = *subaccountId } - subscriptionId := sm.getNextAvailableSubscriptionId() + subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender) - subscription := &OrderbookSubscription{ - subscriptionId: subscriptionId, - initialized: &atomic.Bool{}, // False by default. - clobPairIds: clobPairIds, - subaccountIds: sIds, - marketIds: marketIds, - messageSender: messageSender, - updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), - } for _, clobPairId := range clobPairIds { // if clobPairId exists in the map, append the subscription id to the slice // otherwise, create a new slice with the subscription id @@ -325,16 +347,21 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( sm.Unlock() // If filterOrders, listen to filtered channel and start filter goroutine - // Error if fitlerOrders but no subaccounts are subscribed + // Error if filterOrders but no subaccounts are subscribed filteredUpdateChannel := subscription.updatesChannel - if subscription.filterOrders { + if filterOrders { if len(subaccountIds) == 0 { - // TODO panic? - // log error + sm.logger.Error( + fmt.Sprintf( + "filterOrders requires subaccountIds for subscription id: %+v", + subscription.subscriptionId, + ), + ) + } else { + filteredUpdateChannel = make(chan []clobtypes.StreamUpdate) + defer close(filteredUpdateChannel) + go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger) } - filteredUpdateChannel = make(chan []clobtypes.StreamUpdate) - defer close(filteredUpdateChannel) - go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger) } // Use current goroutine to consistently poll subscription channel for updates diff --git a/protocol/streaming/full_node_streaming_manager_test.go b/protocol/streaming/full_node_streaming_manager_test.go new file mode 100644 index 0000000000..7d07164b58 --- /dev/null +++ b/protocol/streaming/full_node_streaming_manager_test.go @@ -0,0 +1,288 @@ +package streaming_test + +import ( + "testing" + "time" + + sdktypes "github.com/cosmos/cosmos-sdk/types" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + pv1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" + sharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" + "github.com/dydxprotocol/v4-chain/protocol/mocks" + streaming "github.com/dydxprotocol/v4-chain/protocol/streaming" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "github.com/stretchr/testify/require" +) + +const ( + maxSubscriptionChannelSize = 2 ^ 10 + owner = "foo" + noMessagesMaxSleep = 10 * time.Millisecond +) + +func OpenOrder( + order *pv1types.IndexerOrder, + timestamp *time.Time, +) ocutypes.OffChainUpdateV1 { + return ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{ + OrderPlace: &ocutypes.OrderPlaceV1{ + Order: order, + PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED, + TimeStamp: timestamp, + }, + }, + } +} + +func CancelOrder( + removedOrderId *pv1types.IndexerOrderId, + timestamp *time.Time, +) ocutypes.OffChainUpdateV1 { + return ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderRemove{ + OrderRemove: &ocutypes.OrderRemoveV1{ + RemovedOrderId: removedOrderId, + Reason: sharedtypes.OrderRemovalReason(ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED), + RemovalStatus: ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED, + TimeStamp: timestamp, + }, + }, + } +} + +func ReplaceOrder( + oldOrderId *pv1types.IndexerOrderId, + newOrder *pv1types.IndexerOrder, + timestamp *time.Time, +) ocutypes.OffChainUpdateV1 { + return ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderReplace{ + OrderReplace: &ocutypes.OrderReplaceV1{ + OldOrderId: oldOrderId, + Order: newOrder, + PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED, + TimeStamp: timestamp, + }, + }, + } +} + +func UpdateOrder(orderId *pv1types.IndexerOrderId, totalFilledQuantums uint64) ocutypes.OffChainUpdateV1 { + return ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderUpdate{ + OrderUpdate: &ocutypes.OrderUpdateV1{ + OrderId: orderId, + TotalFilledQuantums: totalFilledQuantums, + }, + }, + } +} + +func toStreamUpdate(offChainUpdates []ocutypes.OffChainUpdateV1, blockHeight uint32) clobtypes.StreamUpdate { + return clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: uint32(sdktypes.ExecModeFinalize), + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: offChainUpdates, + Snapshot: true, + }, + }, + } +} + +type MockMessageSender struct{} + +func (mms *MockMessageSender) Send(*clobtypes.StreamOrderbookUpdatesResponse) error { + return nil +} + +func NewOrderbookSubscription( + ids []uint32, + updatesChannel chan []clobtypes.StreamUpdate, +) *streaming.OrderbookSubscription { + sIds := []satypes.SubaccountId{} + for _, id := range ids { + sIds = append(sIds, satypes.SubaccountId{Owner: owner, Number: id}) + } + return streaming.NewOrderbookSubscription( + 0, + []uint32{}, + sIds, + []uint32{}, + &MockMessageSender{}, + updatesChannel, + ) +} + +type TestCase struct { + updates *[]clobtypes.StreamUpdate + subaccountIds []uint32 + filteredUpdates *[]clobtypes.StreamUpdate +} + +func TestFilterStreamUpdates(t *testing.T) { + logger := &mocks.Logger{} + + subaccountIdNumber := uint32(1337) + subaccountId := pv1types.IndexerSubaccountId{ + Owner: "foo", + Number: subaccountIdNumber, + } + orderId := pv1types.IndexerOrderId{ + SubaccountId: subaccountId, + ClientId: 0, + OrderFlags: 0, + ClobPairId: 0, + } + + order := pv1types.IndexerOrder{ + OrderId: orderId, + Side: pv1types.IndexerOrder_SIDE_BUY, + Quantums: uint64(10 ^ 6), + Subticks: 1, + GoodTilOneof: &pv1types.IndexerOrder_GoodTilBlock{ + GoodTilBlock: 10 ^ 9, + }, + TimeInForce: 10 ^ 9, + ReduceOnly: false, + ClientMetadata: 0, + ConditionType: pv1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED, + ConditionalOrderTriggerSubticks: 0, + } + + newOrderId := order.OrderId + newOrderId.ClientId += 1 + + newOrder := order + newOrder.OrderId = newOrderId + newOrder.Quantums += 10 ^ 6 + + totalFilledQuantums := uint64(1988) + + tests := make(map[string]TestCase) + + orderPlaceTime := time.Date(2024, 12, 25, 0, 0, 0, 0, time.UTC) + openOrder := OpenOrder(&order, &orderPlaceTime) + + orderCancelTime := orderPlaceTime.Add(time.Second) + cancelOrder := CancelOrder(&orderId, &orderCancelTime) + + orderReplaceTime := orderPlaceTime.Add(time.Minute) + replaceOrder := ReplaceOrder(&orderId, &newOrder, &orderReplaceTime) + + updateOrder := UpdateOrder(&orderId, totalFilledQuantums) + + baseOffChainUpdates := []ocutypes.OffChainUpdateV1{openOrder, cancelOrder, replaceOrder, updateOrder} + baseStreamUpdates := []clobtypes.StreamUpdate{toStreamUpdate(baseOffChainUpdates, 0)} + tests["baseInScope"] = TestCase{ + updates: &baseStreamUpdates, + subaccountIds: []uint32{orderId.SubaccountId.Number}, + filteredUpdates: &baseStreamUpdates, + } + tests["baseNotInScope"] = TestCase{ + updates: &baseStreamUpdates, + subaccountIds: []uint32{0}, + filteredUpdates: nil, + } + + otherOrderId := orderId + otherSubaccountIdNumber := subaccountIdNumber + uint32(1) + otherOrderId.SubaccountId = pv1types.IndexerSubaccountId{ + Owner: "bar", + Number: otherSubaccountIdNumber, + } + otherOrder := order + otherOrder.OrderId = otherOrderId + + otherNewOrderId := otherOrder.OrderId + otherNewOrderId.ClientId += 1 + + otherNewOrder := otherOrder + otherNewOrder.OrderId = otherNewOrderId + otherNewOrder.Quantums += 10 ^ 6 + + otherOpenOrder := OpenOrder(&otherOrder, &orderPlaceTime) + otherCancelOrder := CancelOrder(&otherOrderId, &orderCancelTime) + otherReplaceOrder := ReplaceOrder(&otherOrderId, &otherNewOrder, &orderReplaceTime) + otherUpdateOrder := UpdateOrder(&otherOrderId, totalFilledQuantums) + + otherOffChainUpdates := []ocutypes.OffChainUpdateV1{ + otherOpenOrder, otherCancelOrder, otherReplaceOrder, otherUpdateOrder, + } + otherStreamUpdates := []clobtypes.StreamUpdate{toStreamUpdate(otherOffChainUpdates, 0)} + tests["otherInScope"] = TestCase{ + updates: &otherStreamUpdates, + subaccountIds: []uint32{otherSubaccountIdNumber}, + filteredUpdates: &otherStreamUpdates, + } + tests["otherNotInScope"] = TestCase{ + updates: &otherStreamUpdates, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: nil, + } + + bothUpdates := []clobtypes.StreamUpdate{ + toStreamUpdate(append(baseOffChainUpdates, otherOffChainUpdates...), 0), + } + tests["bothInScope"] = TestCase{ + updates: &bothUpdates, + subaccountIds: []uint32{subaccountIdNumber, otherSubaccountIdNumber}, + filteredUpdates: &bothUpdates, + } + tests["bothOtherInScope"] = TestCase{ + updates: &bothUpdates, + subaccountIds: []uint32{otherSubaccountIdNumber}, + filteredUpdates: &otherStreamUpdates, + } + tests["bothBaseInScope"] = TestCase{ + updates: &bothUpdates, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &baseStreamUpdates, + } + tests["bothNoneInScopeWrongId"] = TestCase{ + updates: &bothUpdates, + subaccountIds: []uint32{404}, + filteredUpdates: nil, + } + tests["bothNoneInScopeNoId"] = TestCase{ + updates: &bothUpdates, + subaccountIds: []uint32{}, + filteredUpdates: nil, + } + + tests["noUpdates"] = TestCase{ + updates: &[]clobtypes.StreamUpdate{}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: nil, + } + tests["noUpdatesNoId"] = TestCase{ + updates: &[]clobtypes.StreamUpdate{}, + subaccountIds: []uint32{}, + filteredUpdates: nil, + } + + for name, testCase := range tests { + t.Run(name, func(t *testing.T) { + func() { + filteredUpdatesChannel := make(chan []clobtypes.StreamUpdate, maxSubscriptionChannelSize) + defer close(filteredUpdatesChannel) + updatesChannel := make(chan []clobtypes.StreamUpdate, maxSubscriptionChannelSize) + defer close(updatesChannel) + + subscription := NewOrderbookSubscription(testCase.subaccountIds, updatesChannel) + go subscription.FilterSubaccountStreamUpdates(filteredUpdatesChannel, logger) + updatesChannel <- *testCase.updates + + if testCase.filteredUpdates != nil { + require.Equal(t, <-filteredUpdatesChannel, *testCase.filteredUpdates) + } else { + time.Sleep(noMessagesMaxSleep) + require.Equal(t, len(filteredUpdatesChannel), 0) + } + }() + }) + } +} diff --git a/protocol/streaming/util/util.go b/protocol/streaming/util/util.go index efd3be9f01..bdec459c5a 100644 --- a/protocol/streaming/util/util.go +++ b/protocol/streaming/util/util.go @@ -22,21 +22,23 @@ func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes return v1updates } -// TODO best practice for ensuring all cases are handled -// default error? default panic? +// Error expected if OffChainUpdateV1.UpdateMessage message type is extended to more order events func GetOffChainUpdateV1SubaccountIdNumber(update ocutypes.OffChainUpdateV1) (uint32, error) { var orderSubaccountIdNumber uint32 - switch ocu1 := update.UpdateMessage.(type) { + switch updateMessage := update.UpdateMessage.(type) { case *ocutypes.OffChainUpdateV1_OrderPlace: - orderSubaccountIdNumber = ocu1.OrderPlace.Order.OrderId.SubaccountId.Number + orderSubaccountIdNumber = updateMessage.OrderPlace.Order.OrderId.SubaccountId.Number case *ocutypes.OffChainUpdateV1_OrderRemove: - orderSubaccountIdNumber = ocu1.OrderRemove.RemovedOrderId.SubaccountId.Number + orderSubaccountIdNumber = updateMessage.OrderRemove.RemovedOrderId.SubaccountId.Number case *ocutypes.OffChainUpdateV1_OrderUpdate: - orderSubaccountIdNumber = ocu1.OrderUpdate.OrderId.SubaccountId.Number + orderSubaccountIdNumber = updateMessage.OrderUpdate.OrderId.SubaccountId.Number case *ocutypes.OffChainUpdateV1_OrderReplace: - orderSubaccountIdNumber = ocu1.OrderReplace.Order.OrderId.SubaccountId.Number + orderSubaccountIdNumber = updateMessage.OrderReplace.Order.OrderId.SubaccountId.Number default: - return 0, fmt.Errorf("UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}") + return 0, fmt.Errorf( + "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v", + updateMessage, + ) } return orderSubaccountIdNumber, nil }