Skip to content

Commit

Permalink
completely remove activeBackgroundWorkers and rename StoppableWorkers
Browse files Browse the repository at this point in the history
  • Loading branch information
bashar-515 committed Jan 22, 2025
1 parent 95f61ed commit 9f3914b
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions rpc/wrtc_call_queue_mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ type mongoDBWebRTCCallQueue struct {
operatorID string
hostCallerQueueSizeMatchAggStage bson.D
hostAnswererQueueSizeMatchAggStage bson.D
activeBackgroundWorkers sync.WaitGroup
activeStoppableWorkers *utils.StoppableWorkers
activeBackgroundWorkers *utils.StoppableWorkers
callsColl *mongo.Collection
operatorsColl *mongo.Collection
logger utils.ZapCompatibleLogger
Expand Down Expand Up @@ -193,7 +192,7 @@ func NewMongoDBWebRTCCallQueue(
}

cancelCtx, cancelFunc := context.WithCancel(context.Background())
activeStoppableWorkers := utils.NewStoppableWorkers(cancelCtx)
activeBackgroundWorkers := utils.NewStoppableWorkers(cancelCtx)
queue := &mongoDBWebRTCCallQueue{
operatorID: operatorID,
hostCallerQueueSizeMatchAggStage: bson.D{{"$match", bson.D{
Expand All @@ -204,29 +203,29 @@ func NewMongoDBWebRTCCallQueue(
hostAnswererQueueSizeMatchAggStage: bson.D{{"$match", bson.D{
{"answerer_size", bson.D{{"$gte", maxHostAnswerersSize * 2}}},
}}},
activeStoppableWorkers: activeStoppableWorkers,
callsColl: callsColl,
operatorsColl: operatorsColl,
cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
logger: utils.AddFieldsToLogger(logger, "operator_id", operatorID),
activeBackgroundWorkers: activeBackgroundWorkers,
callsColl: callsColl,
operatorsColl: operatorsColl,
cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
logger: utils.AddFieldsToLogger(logger, "operator_id", operatorID),

csStateUpdates: make(chan changeStreamStateUpdate),
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{},
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{},
activeAnswerersfunc: &activeAnswerersfunc,
}

queue.activeStoppableWorkers.Add(func(ctx context.Context) { queue.operatorLivenessLoop() })
queue.activeStoppableWorkers.Add(func(ctx context.Context) { queue.changeStreamManager() })
queue.activeBackgroundWorkers.Add(func(ctx context.Context) { queue.operatorLivenessLoop() })
queue.activeBackgroundWorkers.Add(func(ctx context.Context) { queue.changeStreamManager() })

// wait for change stream to startup once before we start processing anything
// since we need good track of resume tokens / cluster times initially
// to keep an ordering.
startOnce := make(chan struct{})
var startOnceSync sync.Once

queue.activeStoppableWorkers.Add(func(ctx context.Context) {
queue.activeBackgroundWorkers.Add(func(ctx context.Context) {
defer queue.csManagerSeq.Add(1) // helpful on panicked restart
select {
case <-queue.cancelCtx.Done():
Expand Down Expand Up @@ -926,7 +925,7 @@ func (queue *mongoDBWebRTCCallQueue) SendOfferInit(
return true
}
}
queue.activeStoppableWorkers.Add(func(ctx context.Context) {
queue.activeBackgroundWorkers.Add(func(ctx context.Context) {
defer cleanup()
defer close(answererResponses)

Expand Down Expand Up @@ -1214,7 +1213,7 @@ func (queue *mongoDBWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []stri
queue.logger.Errorw("error in RecvOffer", "error", errToSet, "id", callReq.ID)
}
// we assume the number of goroutines is bounded by the gRPC server invoking this method.
queue.activeStoppableWorkers.Add(func(ctx context.Context) {
queue.activeBackgroundWorkers.Add(func(ctx context.Context) {

// we need a dedicated timeout since even if the server is shutting down,
// we want to notify other servers immediately, instead of waiting for a timeout.
Expand Down Expand Up @@ -1254,7 +1253,7 @@ func (queue *mongoDBWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []stri
// and trying to connect to each other
// as both are doing trickle ice and generating new candidates with SDPs that are being updated in the
// table we try each of them as they come in to make a match
queue.activeStoppableWorkers.Add(func(ctx context.Context) {
queue.activeBackgroundWorkers.Add(func(ctx context.Context) {
defer callerDoneCancel()
defer cleanup()

Expand Down Expand Up @@ -1356,8 +1355,7 @@ func iceCandidateToMongo(i *webrtc.ICECandidateInit) mongodbICECandidate {
// Close cancels all active offers and waits to cleanly close all background workers.
func (queue *mongoDBWebRTCCallQueue) Close() error {
queue.cancelFunc()
queue.activeBackgroundWorkers.Wait()
queue.activeStoppableWorkers.Stop()
queue.activeBackgroundWorkers.Stop()
return nil
}

Expand Down

0 comments on commit 9f3914b

Please sign in to comment.