Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8666] Use Stoppable Workers #408

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5a92489
add TODO comments
bashar-515 Jan 22, 2025
99c5ece
lint
bashar-515 Jan 22, 2025
c8a02cf
use stoppable workers in constructor with ticker
bashar-515 Jan 22, 2025
80eb5df
use stoppable workers when initiating offer
bashar-515 Jan 22, 2025
2fd4869
replace activeBackgroundWorkers
bashar-515 Jan 22, 2025
7a7b604
no longer store context
bashar-515 Jan 22, 2025
b59f413
remove lock in constructor
bashar-515 Jan 22, 2025
7a86540
use stoppable workers in webRTC server
bashar-515 Jan 22, 2025
e57158a
no longer store context and cancel function in webRTC server
bashar-515 Jan 22, 2025
642c943
remove context comments
bashar-515 Jan 22, 2025
d46bd50
remove TODO comment
bashar-515 Jan 22, 2025
0b5c310
add stoppable workers to mongoDB webRTC call queue
bashar-515 Jan 22, 2025
41309b6
use only stoppable workers throughout constructor
bashar-515 Jan 22, 2025
54731bc
use only stoppable workers in SendOfferInit
bashar-515 Jan 22, 2025
20566aa
restore wrtc_call_queue_mongodb.go file
bashar-515 Jan 22, 2025
b2d328c
add StoppableWorkers to mongoDBWebRTCCallQueue struct
bashar-515 Jan 22, 2025
fe3a863
fix nil point error
bashar-515 Jan 22, 2025
d29f0e8
use StoppableWorkers throughout constructor
bashar-515 Jan 22, 2025
fac6086
no longer wrap functions passed to StoppableWorkers.Add()
bashar-515 Jan 22, 2025
ba8ce89
bring back lock
bashar-515 Jan 22, 2025
297739d
use func() wrappers again
bashar-515 Jan 22, 2025
2dc6cc5
comment out use of StoppableWorkers in constructor and use StoppableW…
bashar-515 Jan 22, 2025
58458a9
lint
bashar-515 Jan 22, 2025
c2a4c08
use StoppableWorkers in RecvOffer()
bashar-515 Jan 22, 2025
fcf72dd
lint
bashar-515 Jan 22, 2025
8fd687a
use StoppableWorkers in the rest of RecvOffer()
bashar-515 Jan 22, 2025
3a5e3c7
use StoppableWorkers in part of constructor
bashar-515 Jan 22, 2025
cc4887d
use StoppableWorkers when calling ChangeStreamManager()
bashar-515 Jan 22, 2025
d6d8aa2
lint
bashar-515 Jan 23, 2025
ef5b3d3
use StoppableWorkers when calling operatorLivenessLoop()
bashar-515 Jan 23, 2025
657a560
add TODO comment
bashar-515 Jan 23, 2025
31001ec
omit for-loop
bashar-515 Jan 23, 2025
454d29f
replace one more PanicCapturingGo() call
bashar-515 Jan 23, 2025
b203203
use StoppableWorkers in constructor
bashar-515 Jan 24, 2025
8a66f53
Revert "use StoppableWorkers in constructor"
bashar-515 Jan 24, 2025
0840eba
no longer store context in struct
bashar-515 Jan 24, 2025
7eb91ef
lint
bashar-515 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 18 additions & 39 deletions rpc/wrtc_call_queue_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ import (
// testing and single node/host deployments.
type memoryWebRTCCallQueue struct {
mu sync.Mutex
activeBackgroundWorkers sync.WaitGroup
activeBackgroundWorkers *utils.StoppableWorkers
hostQueues map[string]*singleWebRTCHostQueue

cancelCtx context.Context
cancelFunc func()

uuidDeterministic bool
uuidDeterministicCounter int64
logger utils.ZapCompatibleLogger
Expand All @@ -41,42 +38,27 @@ func newMemoryWebRTCCallQueueTest(logger utils.ZapCompatibleLogger) *memoryWebRT
}

func newMemoryWebRTCCallQueue(uuidDeterministic bool, logger utils.ZapCompatibleLogger) *memoryWebRTCCallQueue {
cancelCtx, cancelFunc := context.WithCancel(context.Background())
queue := &memoryWebRTCCallQueue{
hostQueues: map[string]*singleWebRTCHostQueue{},
cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
uuidDeterministic: uuidDeterministic,
logger: logger,
}
queue.activeBackgroundWorkers.Add(1)
ticker := time.NewTicker(5 * time.Second)
utils.ManagedGo(func() {
for {
if cancelCtx.Err() != nil {
return
}
select {
case <-cancelCtx.Done():
return
case <-ticker.C:
}
now := time.Now()
queue.mu.Lock()
for _, hostQueue := range queue.hostQueues {
hostQueue.mu.Lock()
for offerID, offer := range hostQueue.activeOffers {
if d, ok := offer.offer.answererDoneCtx.Deadline(); ok && d.Before(now) {
delete(hostQueue.activeOffers, offerID)
}
queue.activeBackgroundWorkers = utils.NewStoppableWorkerWithTicker(5*time.Second, func(ctx context.Context) {
if ctx.Err() != nil {
return
}
now := time.Now()
queue.mu.Lock()
for _, hostQueue := range queue.hostQueues {
hostQueue.mu.Lock()
for offerID, offer := range hostQueue.activeOffers {
if d, ok := offer.offer.answererDoneCtx.Deadline(); ok && d.Before(now) {
delete(hostQueue.activeOffers, offerID)
}
hostQueue.mu.Unlock()
}
queue.mu.Unlock()
hostQueue.mu.Unlock()
}
}, func() {
defer queue.activeBackgroundWorkers.Done()
defer ticker.Stop()
queue.mu.Unlock()
})
return queue
}
Expand Down Expand Up @@ -113,7 +95,7 @@ func (queue *memoryWebRTCCallQueue) SendOfferInit(
}
answererResponses := make(chan WebRTCCallAnswer)
offerDeadline := time.Now().Add(getDefaultOfferDeadline())
sendCtx, sendCtxCancel := context.WithDeadline(queue.cancelCtx, offerDeadline)
sendCtx, sendCtxCancel := context.WithDeadline(queue.activeBackgroundWorkers.Context(), offerDeadline)
offer := memoryWebRTCCallOfferInit{
uuid: newUUID,
sdp: sdp,
Expand All @@ -135,9 +117,7 @@ func (queue *memoryWebRTCCallQueue) SendOfferInit(
hostQueueForSend.activeOffers[offer.uuid] = exchange
hostQueueForSend.mu.Unlock()

queue.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
queue.activeBackgroundWorkers.Done()
queue.activeBackgroundWorkers.Add(func(ctx context.Context) {
select {
case <-sendCtx.Done():
case <-ctx.Done():
Expand Down Expand Up @@ -213,7 +193,7 @@ func (queue *memoryWebRTCCallQueue) SendOfferError(ctx context.Context, host, uu
func (queue *memoryWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []string) (WebRTCCallOfferExchange, error) {
hostQueue := queue.getOrMakeHostsQueue(hosts)

recvCtx, recvCtxCancel := context.WithCancel(queue.cancelCtx)
recvCtx, recvCtxCancel := context.WithCancel(queue.activeBackgroundWorkers.Context())
defer recvCtxCancel()

select {
Expand All @@ -228,8 +208,7 @@ func (queue *memoryWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []strin

// Close cancels all active offers and waits to cleanly close all background workers.
func (queue *memoryWebRTCCallQueue) Close() error {
queue.cancelFunc()
queue.activeBackgroundWorkers.Wait()
queue.activeBackgroundWorkers.Stop()
return nil
}

Expand Down
90 changes: 39 additions & 51 deletions rpc/wrtc_call_queue_mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,11 @@ type mongoDBWebRTCCallQueue struct {
operatorID string
hostCallerQueueSizeMatchAggStage bson.D
hostAnswererQueueSizeMatchAggStage bson.D
activeBackgroundWorkers sync.WaitGroup
activeStoppableWorkers *utils.StoppableWorkers
callsColl *mongo.Collection
operatorsColl *mongo.Collection
logger utils.ZapCompatibleLogger

cancelCtx context.Context
cancelFunc func()

csStateMu sync.RWMutex
// this is a counter that increases based on errors / answerers or callers coming live
// and indicates whether the changestream needs to swap
Expand Down Expand Up @@ -189,7 +186,7 @@ func NewMongoDBWebRTCCallQueue(
return nil, err
}

cancelCtx, cancelFunc := context.WithCancel(context.Background())
activeStoppableWorkers := utils.NewBackgroundStoppableWorkers()
queue := &mongoDBWebRTCCallQueue{
operatorID: operatorID,
hostCallerQueueSizeMatchAggStage: bson.D{{"$match", bson.D{
Expand All @@ -200,33 +197,31 @@ func NewMongoDBWebRTCCallQueue(
hostAnswererQueueSizeMatchAggStage: bson.D{{"$match", bson.D{
{"answerer_size", bson.D{{"$gte", maxHostAnswerersSize * 2}}},
}}},
callsColl: callsColl,
operatorsColl: operatorsColl,
cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
logger: utils.AddFieldsToLogger(logger, "operator_id", operatorID),
activeStoppableWorkers: activeStoppableWorkers,
callsColl: callsColl,
operatorsColl: operatorsColl,
logger: utils.AddFieldsToLogger(logger, "operator_id", operatorID),

csStateUpdates: make(chan changeStreamStateUpdate),
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{},
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{},
activeAnswerersfunc: &activeAnswerersfunc,
}

queue.activeBackgroundWorkers.Add(2)
utils.ManagedGo(queue.operatorLivenessLoop, queue.activeBackgroundWorkers.Done)
utils.ManagedGo(queue.changeStreamManager, queue.activeBackgroundWorkers.Done)
// TODO(RSDK-8666): using StoppableWorkers is causing a data race
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The liveness loop is racing? Could you follow up with me offline about what the race looks like?

queue.activeStoppableWorkers.Add(func(ctx context.Context) { queue.operatorLivenessLoop() })
queue.activeStoppableWorkers.Add(func(ctx context.Context) { queue.changeStreamManager() })

// wait for change stream to startup once before we start processing anything
// since we need good track of resume tokens / cluster times initially
// to keep an ordering.
startOnce := make(chan struct{})
var startOnceSync sync.Once

queue.activeBackgroundWorkers.Add(1)
utils.ManagedGo(func() {
queue.activeStoppableWorkers.Add(func(ctx context.Context) {
defer queue.csManagerSeq.Add(1) // helpful on panicked restart
select {
case <-queue.cancelCtx.Done():
case <-queue.activeStoppableWorkers.Context().Done():
return
case newState := <-queue.csStateUpdates:
queue.processClusterEventState(newState.ResumeToken, newState.ClusterTime)
Expand All @@ -235,11 +230,11 @@ func NewMongoDBWebRTCCallQueue(
})
queue.subscriptionManager(newState.ChangeStream)
}
}, queue.activeBackgroundWorkers.Done)
})

select {
case <-queue.cancelCtx.Done():
return nil, multierr.Combine(queue.Close(), queue.cancelCtx.Err())
case <-queue.activeStoppableWorkers.Context().Done():
return nil, multierr.Combine(queue.Close(), queue.activeStoppableWorkers.Context().Err())
case <-startOnce:
}

Expand Down Expand Up @@ -344,7 +339,7 @@ func (queue *mongoDBWebRTCCallQueue) operatorLivenessLoop() {
ticker := time.NewTicker(operatorStateUpdateInterval)
defer ticker.Stop()
for {
if !utils.SelectContextOrWaitChan(queue.cancelCtx, ticker.C) {
if !utils.SelectContextOrWaitChan(queue.activeStoppableWorkers.Context(), ticker.C) {
return
}
type callerAnswererQueueSizes struct {
Expand Down Expand Up @@ -390,15 +385,16 @@ func (queue *mongoDBWebRTCCallQueue) operatorLivenessLoop() {
})
}

if _, err := queue.operatorsColl.UpdateOne(queue.cancelCtx, bson.D{{webrtcOperatorIDField, queue.operatorID}}, bson.D{
{
"$set",
bson.D{
{webrtcOperatorExpireAtField, time.Now().Add(operatorHeartbeatWindow)},
{webrtcOperatorHostsField, hostSizes},
if _, err := queue.operatorsColl.UpdateOne(queue.activeStoppableWorkers.Context(), bson.D{{webrtcOperatorIDField, queue.operatorID}},
bson.D{
{
"$set",
bson.D{
{webrtcOperatorExpireAtField, time.Now().Add(operatorHeartbeatWindow)},
{webrtcOperatorHostsField, hostSizes},
},
},
},
}); err != nil {
}); err != nil {
if !errors.Is(err, context.Canceled) {
queue.logger.Errorw("failed to update operator document for self", "error", err)
}
Expand Down Expand Up @@ -427,7 +423,7 @@ func (queue *mongoDBWebRTCCallQueue) changeStreamManager() {
for {
// Note(erd): this could use condition variables instead in order to be efficient about
// change stream restarts, but it does not feel worth the complexity right now :o)
if !utils.SelectContextOrWaitChan(queue.cancelCtx, ticker.C) {
if !utils.SelectContextOrWaitChan(queue.activeStoppableWorkers.Context(), ticker.C) {
return
}
currSeq := queue.csManagerSeq.Load()
Expand Down Expand Up @@ -460,7 +456,7 @@ func (queue *mongoDBWebRTCCallQueue) changeStreamManager() {

// note(roxy): this is updating the changestream based on whether there is a new
// answerer that is coming online or if there is a new caller that is coming online
cs, err := queue.callsColl.Watch(queue.cancelCtx, []bson.D{
cs, err := queue.callsColl.Watch(queue.activeStoppableWorkers.Context(), []bson.D{
{
{"$match", bson.D{
{"operationType", bson.D{{"$in", []interface{}{
Expand Down Expand Up @@ -500,11 +496,11 @@ func (queue *mongoDBWebRTCCallQueue) changeStreamManager() {
queue.csStateMu.Unlock()
activeHosts.Set(queue.operatorID, int64(len(hosts)))

nextCSCtx, nextCSCtxCancel := context.WithCancel(queue.cancelCtx)
nextCSCtx, nextCSCtxCancel := context.WithCancel(queue.activeStoppableWorkers.Context())
csNext, resumeToken, clusterTime := mongoutils.ChangeStreamBackground(nextCSCtx, cs)

select {
case <-queue.cancelCtx.Done():
case <-queue.activeStoppableWorkers.Context().Done():
// note(roxy): this is the server's cancelCtx being called
// should stop the entire call queue managed by CS, not just a single CS
nextCSCtxCancel()
Expand Down Expand Up @@ -677,14 +673,14 @@ func (queue *mongoDBWebRTCCallQueue) processNextSubscriptionEvent(next mongoutil
func (queue *mongoDBWebRTCCallQueue) subscriptionManager(currentCS <-chan mongoutils.ChangeEventResult) {
var waitForNextCS bool
for {
if queue.cancelCtx.Err() != nil {
if queue.activeStoppableWorkers.Context().Err() != nil {
return
}
if waitForNextCS {
// we want to block here so that we do not keep receiving bad events.
waitForNextCS = false
select {
case <-queue.cancelCtx.Done():
case <-queue.activeStoppableWorkers.Context().Done():
return
case newState := <-queue.csStateUpdates:
currentCS = newState.ChangeStream
Expand All @@ -693,7 +689,7 @@ func (queue *mongoDBWebRTCCallQueue) subscriptionManager(currentCS <-chan mongou
} else {
// otherwise we can do a quick check.
select {
case <-queue.cancelCtx.Done():
case <-queue.activeStoppableWorkers.Context().Done():
return
case next, ok := <-currentCS: // try and make some progress at least once
waitForNextCS = queue.processNextSubscriptionEvent(next, ok)
Expand All @@ -706,7 +702,7 @@ func (queue *mongoDBWebRTCCallQueue) subscriptionManager(currentCS <-chan mongou

// finally allow accepting a new change stream while checking for events.
select {
case <-queue.cancelCtx.Done():
case <-queue.activeStoppableWorkers.Context().Done():
return
case newState := <-queue.csStateUpdates:
currentCS = newState.ChangeStream
Expand Down Expand Up @@ -889,7 +885,7 @@ func (queue *mongoDBWebRTCCallQueue) SendOfferInit(
sendCtx, sendCtxCancel := context.WithDeadline(ctx, offerDeadline)

// need to watch before insertion to avoid a race
sendAndQueueCtx, sendAndQueueCtxCancel := utils.MergeContext(sendCtx, queue.cancelCtx)
sendAndQueueCtx, sendAndQueueCtxCancel := utils.MergeContext(sendCtx, queue.activeStoppableWorkers.Context())

cleanup := func() {
sendAndQueueCtxCancel()
Expand Down Expand Up @@ -923,9 +919,7 @@ func (queue *mongoDBWebRTCCallQueue) SendOfferInit(
return true
}
}
queue.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
defer queue.activeBackgroundWorkers.Done()
queue.activeStoppableWorkers.Add(func(ctx context.Context) {
defer cleanup()
defer close(answererResponses)

Expand Down Expand Up @@ -1040,7 +1034,7 @@ func (queue *mongoDBWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []stri
return nil, err
}

recvOfferCtx, recvOfferCtxCancel := utils.MergeContext(ctx, queue.cancelCtx)
recvOfferCtx, recvOfferCtxCancel := utils.MergeContext(ctx, queue.activeStoppableWorkers.Context())
waitForNewCall := func() (mongodbWebRTCCall, bool, error) {
events, callUnsubscribe, err := queue.subscribeForNewCallOnHosts(recvOfferCtx, hosts)
if err != nil {
Expand Down Expand Up @@ -1186,7 +1180,7 @@ func (queue *mongoDBWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []stri

offerDeadline := callReq.StartedAt.Add(getDefaultOfferDeadline())

recvCtx, recvCtxCancel := utils.MergeContextWithDeadline(ctx, queue.cancelCtx, offerDeadline)
recvCtx, recvCtxCancel := utils.MergeContextWithDeadline(ctx, queue.activeStoppableWorkers.Context(), offerDeadline)

cleanup := func() {
recvCtxCancel()
Expand All @@ -1213,10 +1207,7 @@ func (queue *mongoDBWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []stri
queue.logger.Errorw("error in RecvOffer", "error", errToSet, "id", callReq.ID)
}
// we assume the number of goroutines is bounded by the gRPC server invoking this method.
queue.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
queue.activeBackgroundWorkers.Done()

queue.activeStoppableWorkers.Add(func(ctx context.Context) {
// we need a dedicated timeout since even if the server is shutting down,
// we want to notify other servers immediately, instead of waiting for a timeout.
updateCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down Expand Up @@ -1255,9 +1246,7 @@ func (queue *mongoDBWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []stri
// and trying to connect to each other
// as both are doing trickle ice and generating new candidates with SDPs that are being updated in the
// table we try each of them as they come in to make a match
queue.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
defer queue.activeBackgroundWorkers.Done()
queue.activeStoppableWorkers.Add(func(ctx context.Context) {
defer callerDoneCancel()
defer cleanup()

Expand Down Expand Up @@ -1358,8 +1347,7 @@ func iceCandidateToMongo(i *webrtc.ICECandidateInit) mongodbICECandidate {

// Close cancels all active offers and waits to cleanly close all background workers.
func (queue *mongoDBWebRTCCallQueue) Close() error {
queue.cancelFunc()
queue.activeBackgroundWorkers.Wait()
queue.activeStoppableWorkers.Stop()
return nil
}

Expand Down
Loading
Loading