Skip to content

Commit

Permalink
restore wrtc_call_queue_mongodb.go file
Browse files Browse the repository at this point in the history
  • Loading branch information
bashar-515 committed Jan 22, 2025
1 parent 54731bc commit 20566aa
Showing 1 changed file with 14 additions and 16 deletions.
30 changes: 14 additions & 16 deletions rpc/wrtc_call_queue_mongodb.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package rpc

// TODO(RSDK-8666): use stoppable workers

import (
"context"
"fmt"
Expand Down Expand Up @@ -73,7 +71,6 @@ type mongoDBWebRTCCallQueue struct {
hostCallerQueueSizeMatchAggStage bson.D
hostAnswererQueueSizeMatchAggStage bson.D
activeBackgroundWorkers sync.WaitGroup
activeStoppableWorkers *utils.StoppableWorkers
callsColl *mongo.Collection
operatorsColl *mongo.Collection
logger utils.ZapCompatibleLogger
Expand Down Expand Up @@ -193,7 +190,6 @@ func NewMongoDBWebRTCCallQueue(
}

cancelCtx, cancelFunc := context.WithCancel(context.Background())
activeStoppableWorkers := utils.NewStoppableWorkers(cancelCtx)
queue := &mongoDBWebRTCCallQueue{
operatorID: operatorID,
hostCallerQueueSizeMatchAggStage: bson.D{{"$match", bson.D{
Expand All @@ -204,29 +200,30 @@ func NewMongoDBWebRTCCallQueue(
hostAnswererQueueSizeMatchAggStage: bson.D{{"$match", bson.D{
{"answerer_size", bson.D{{"$gte", maxHostAnswerersSize * 2}}},
}}},
activeStoppableWorkers: activeStoppableWorkers,
callsColl: callsColl,
operatorsColl: operatorsColl,
cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
logger: utils.AddFieldsToLogger(logger, "operator_id", operatorID),
callsColl: callsColl,
operatorsColl: operatorsColl,
cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
logger: utils.AddFieldsToLogger(logger, "operator_id", operatorID),

csStateUpdates: make(chan changeStreamStateUpdate),
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{},
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{},
activeAnswerersfunc: &activeAnswerersfunc,
}

queue.activeStoppableWorkers.Add(func(ctx context.Context) { queue.operatorLivenessLoop() })
queue.activeStoppableWorkers.Add(func(ctx context.Context) { queue.changeStreamManager() })
queue.activeBackgroundWorkers.Add(2)
utils.ManagedGo(queue.operatorLivenessLoop, queue.activeBackgroundWorkers.Done)
utils.ManagedGo(queue.changeStreamManager, queue.activeBackgroundWorkers.Done)

// wait for change stream to startup once before we start processing anything
// since we need good track of resume tokens / cluster times initially
// to keep an ordering.
startOnce := make(chan struct{})
var startOnceSync sync.Once

queue.activeStoppableWorkers.Add(func(ctx context.Context) {
queue.activeBackgroundWorkers.Add(1)
utils.ManagedGo(func() {
defer queue.csManagerSeq.Add(1) // helpful on panicked restart
select {
case <-queue.cancelCtx.Done():
Expand All @@ -238,7 +235,7 @@ func NewMongoDBWebRTCCallQueue(
})
queue.subscriptionManager(newState.ChangeStream)
}
})
}, queue.activeBackgroundWorkers.Done)

select {
case <-queue.cancelCtx.Done():
Expand Down Expand Up @@ -926,7 +923,9 @@ func (queue *mongoDBWebRTCCallQueue) SendOfferInit(
return true
}
}
queue.activeStoppableWorkers.Add(func(ctx context.Context) {
queue.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
defer queue.activeBackgroundWorkers.Done()
defer cleanup()
defer close(answererResponses)

Expand Down Expand Up @@ -1361,7 +1360,6 @@ func iceCandidateToMongo(i *webrtc.ICECandidateInit) mongodbICECandidate {
func (queue *mongoDBWebRTCCallQueue) Close() error {
queue.cancelFunc()
queue.activeBackgroundWorkers.Wait()
queue.activeStoppableWorkers.Stop()
return nil
}

Expand Down

0 comments on commit 20566aa

Please sign in to comment.