diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 6cf24bd0b51..af0d5671cdc 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -35,7 +35,7 @@ type FullNodeStreamingManagerImpl struct { // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. orderbookSubscriptions map[uint32]*OrderbookSubscription - nextSubscriptionId uint32 + activeSubscriptionIds map[uint32]bool // stream will batch and flush out messages every 10 ms. ticker *time.Ticker @@ -106,7 +106,7 @@ func NewFullNodeStreamingManager( fullNodeStreamingManager := &FullNodeStreamingManagerImpl{ logger: logger, orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), - nextSubscriptionId: 0, + activeSubscriptionIds: make(map[uint32]bool), ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), done: make(chan bool), @@ -170,6 +170,16 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { } } +// getNextAvailableSubscriptionId returns next available subscription id. Assumes the +// lock has been acquired. +func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 { + id := uint32(0) + for _, inUse := sm.activeSubscriptionIds[id]; inUse; _, inUse = sm.activeSubscriptionIds[id] { + id = id + uint32(1) + } + return id +} + // Subscribe subscribes to the orderbook updates stream. func (sm *FullNodeStreamingManagerImpl) Subscribe( clobPairIds []uint32, @@ -188,8 +198,11 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( for i, subaccountId := range subaccountIds { sIds[i] = *subaccountId } + + subscriptionId := sm.getNextAvailableSubscriptionId() + subscription := &OrderbookSubscription{ - subscriptionId: sm.nextSubscriptionId, + subscriptionId: subscriptionId, initialized: &atomic.Bool{}, // False by default. clobPairIds: clobPairIds, subaccountIds: sIds, @@ -204,7 +217,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( } sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append( sm.clobPairIdToSubscriptionIdMapping[clobPairId], - sm.nextSubscriptionId, + subscription.subscriptionId, ) } for _, subaccountId := range sIds { @@ -215,7 +228,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( } sm.subaccountIdToSubscriptionIdMapping[subaccountId] = append( sm.subaccountIdToSubscriptionIdMapping[subaccountId], - sm.nextSubscriptionId, + subscription.subscriptionId, ) } @@ -228,7 +241,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( ), ) sm.orderbookSubscriptions[subscription.subscriptionId] = subscription - sm.nextSubscriptionId++ + sm.activeSubscriptionIds[subscription.subscriptionId] = true sm.EmitMetrics() sm.Unlock() @@ -280,6 +293,7 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription( } close(subscription.updatesChannel) delete(sm.orderbookSubscriptions, subscriptionIdToRemove) + delete(sm.activeSubscriptionIds, subscriptionIdToRemove) // Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping {