Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8666] Use Stoppable Workers #408

Merged
merged 41 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5a92489
add TODO comments
bashar-515 Jan 22, 2025
99c5ece
lint
bashar-515 Jan 22, 2025
c8a02cf
use stoppable workers in constructor with ticker
bashar-515 Jan 22, 2025
80eb5df
use stoppable workers when initiating offer
bashar-515 Jan 22, 2025
2fd4869
replace activeBackgroundWorkers
bashar-515 Jan 22, 2025
7a7b604
no longer store context
bashar-515 Jan 22, 2025
b59f413
remove lock in constructor
bashar-515 Jan 22, 2025
7a86540
use stoppable workers in webRTC server
bashar-515 Jan 22, 2025
e57158a
no longer store context and cancel function in webRTC server
bashar-515 Jan 22, 2025
642c943
remove context comments
bashar-515 Jan 22, 2025
d46bd50
remove TODO comment
bashar-515 Jan 22, 2025
0b5c310
add stoppable workers to mongoDB webRTC call queue
bashar-515 Jan 22, 2025
41309b6
use only stoppable workers throughout constructor
bashar-515 Jan 22, 2025
54731bc
use only stoppable workers in SendOfferInit
bashar-515 Jan 22, 2025
20566aa
restore wrtc_call_queue_mongodb.go file
bashar-515 Jan 22, 2025
b2d328c
add StoppableWorkers to mongoDBWebRTCCallQueue struct
bashar-515 Jan 22, 2025
fe3a863
fix nil point error
bashar-515 Jan 22, 2025
d29f0e8
use StoppableWorkers throughout constructor
bashar-515 Jan 22, 2025
fac6086
no longer wrap functions passed to StoppableWorkers.Add()
bashar-515 Jan 22, 2025
ba8ce89
bring back lock
bashar-515 Jan 22, 2025
297739d
use func() wrappers again
bashar-515 Jan 22, 2025
2dc6cc5
comment out use of StoppableWorkers in constructor and use StoppableW…
bashar-515 Jan 22, 2025
58458a9
lint
bashar-515 Jan 22, 2025
c2a4c08
use StoppableWorkers in RecvOffer()
bashar-515 Jan 22, 2025
fcf72dd
lint
bashar-515 Jan 22, 2025
8fd687a
use StoppableWorkers in the rest of RecvOffer()
bashar-515 Jan 22, 2025
3a5e3c7
use StoppableWorkers in part of constructor
bashar-515 Jan 22, 2025
cc4887d
use StoppableWorkers when calling ChangeStreamManager()
bashar-515 Jan 22, 2025
d6d8aa2
lint
bashar-515 Jan 23, 2025
ef5b3d3
use StoppableWorkers when calling operatorLivenessLoop()
bashar-515 Jan 23, 2025
657a560
add TODO comment
bashar-515 Jan 23, 2025
31001ec
omit for-loop
bashar-515 Jan 23, 2025
454d29f
replace one more PanicCapturingGo() call
bashar-515 Jan 23, 2025
b203203
use StoppableWorkers in constructor
bashar-515 Jan 24, 2025
8a66f53
Revert "use StoppableWorkers in constructor"
bashar-515 Jan 24, 2025
0840eba
no longer store context in struct
bashar-515 Jan 24, 2025
7eb91ef
lint
bashar-515 Jan 24, 2025
6a5d752
no longer check for context error
bashar-515 Jan 28, 2025
8241b32
restore rpc/wrtc_call_queue_mongodb.go file
bashar-515 Jan 28, 2025
16e2f4b
no longer check parent context in ticker
bashar-515 Jan 28, 2025
ae45b2b
use caller context
bashar-515 Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 15 additions & 39 deletions rpc/wrtc_call_queue_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ import (
// testing and single node/host deployments.
type memoryWebRTCCallQueue struct {
mu sync.Mutex
activeBackgroundWorkers sync.WaitGroup
activeBackgroundWorkers *utils.StoppableWorkers
hostQueues map[string]*singleWebRTCHostQueue

cancelCtx context.Context
cancelFunc func()

uuidDeterministic bool
uuidDeterministicCounter int64
logger utils.ZapCompatibleLogger
Expand All @@ -41,42 +38,24 @@ func newMemoryWebRTCCallQueueTest(logger utils.ZapCompatibleLogger) *memoryWebRT
}

func newMemoryWebRTCCallQueue(uuidDeterministic bool, logger utils.ZapCompatibleLogger) *memoryWebRTCCallQueue {
cancelCtx, cancelFunc := context.WithCancel(context.Background())
queue := &memoryWebRTCCallQueue{
hostQueues: map[string]*singleWebRTCHostQueue{},
cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
uuidDeterministic: uuidDeterministic,
logger: logger,
}
queue.activeBackgroundWorkers.Add(1)
ticker := time.NewTicker(5 * time.Second)
utils.ManagedGo(func() {
for {
if cancelCtx.Err() != nil {
return
}
select {
case <-cancelCtx.Done():
return
case <-ticker.C:
}
now := time.Now()
queue.mu.Lock()
for _, hostQueue := range queue.hostQueues {
hostQueue.mu.Lock()
for offerID, offer := range hostQueue.activeOffers {
if d, ok := offer.offer.answererDoneCtx.Deadline(); ok && d.Before(now) {
delete(hostQueue.activeOffers, offerID)
}
queue.activeBackgroundWorkers = utils.NewStoppableWorkerWithTicker(5*time.Second, func(ctx context.Context) {
now := time.Now()
queue.mu.Lock()
for _, hostQueue := range queue.hostQueues {
hostQueue.mu.Lock()
for offerID, offer := range hostQueue.activeOffers {
if d, ok := offer.offer.answererDoneCtx.Deadline(); ok && d.Before(now) {
delete(hostQueue.activeOffers, offerID)
}
hostQueue.mu.Unlock()
}
queue.mu.Unlock()
hostQueue.mu.Unlock()
}
}, func() {
defer queue.activeBackgroundWorkers.Done()
defer ticker.Stop()
queue.mu.Unlock()
})
return queue
}
Expand Down Expand Up @@ -113,7 +92,7 @@ func (queue *memoryWebRTCCallQueue) SendOfferInit(
}
answererResponses := make(chan WebRTCCallAnswer)
offerDeadline := time.Now().Add(getDefaultOfferDeadline())
sendCtx, sendCtxCancel := context.WithDeadline(queue.cancelCtx, offerDeadline)
sendCtx, sendCtxCancel := context.WithDeadline(queue.activeBackgroundWorkers.Context(), offerDeadline)
offer := memoryWebRTCCallOfferInit{
uuid: newUUID,
sdp: sdp,
Expand All @@ -135,9 +114,7 @@ func (queue *memoryWebRTCCallQueue) SendOfferInit(
hostQueueForSend.activeOffers[offer.uuid] = exchange
hostQueueForSend.mu.Unlock()

queue.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
queue.activeBackgroundWorkers.Done()
queue.activeBackgroundWorkers.Add(func(_ context.Context) {
select {
case <-sendCtx.Done():
case <-ctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer the same context. Before this ctx was the one passed into SendOfferInit. Now we've shadowed that with the activeBackgroundWorkers context. Which is already covered by the sendCtx.

Taking a step back. In the existing code, I'm not sure what the relationship is between the caller's context and the queue.cancelCtx. I think* the SendOfferInit ctx is the gRPC request's context. In which case we'd want to revert to using that one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch totally missed this

Expand Down Expand Up @@ -213,7 +190,7 @@ func (queue *memoryWebRTCCallQueue) SendOfferError(ctx context.Context, host, uu
func (queue *memoryWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []string) (WebRTCCallOfferExchange, error) {
hostQueue := queue.getOrMakeHostsQueue(hosts)

recvCtx, recvCtxCancel := context.WithCancel(queue.cancelCtx)
recvCtx, recvCtxCancel := context.WithCancel(queue.activeBackgroundWorkers.Context())
defer recvCtxCancel()

select {
Expand All @@ -228,8 +205,7 @@ func (queue *memoryWebRTCCallQueue) RecvOffer(ctx context.Context, hosts []strin

// Close cancels all active offers and waits to cleanly close all background workers.
func (queue *memoryWebRTCCallQueue) Close() error {
queue.cancelFunc()
queue.activeBackgroundWorkers.Wait()
queue.activeBackgroundWorkers.Stop()
return nil
}

Expand Down
16 changes: 3 additions & 13 deletions rpc/wrtc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,14 @@ var DefaultWebRTCMaxGRPCCalls = 256

// A webrtcServer translates gRPC frames over WebRTC data channels into gRPC calls.
type webrtcServer struct {
ctx context.Context
cancel context.CancelFunc
handlers map[string]handlerFunc
services map[string]*serviceInfo
logger utils.ZapCompatibleLogger

peerConnsMu sync.Mutex
peerConns map[*webrtc.PeerConnection]struct{}

// processHeadersMu should be `Lock`ed in `Stop` to `Wait` on ongoing
// processHeaders calls (incoming method invocations). processHeaderMu should
// be `RLock`ed in processHeaders (allow concurrent processHeaders) to `Add`
// to processHeadersWorkers.
processHeadersMu sync.RWMutex
processHeadersWorkers sync.WaitGroup
processHeadersWorkers *utils.StoppableWorkers

callTickets chan struct{}

Expand Down Expand Up @@ -129,18 +122,15 @@ func newWebRTCServerWithInterceptorsAndUnknownStreamHandler(
streamInt: streamInt,
unknownStreamDesc: unknownStreamDesc,
}
srv.ctx, srv.cancel = context.WithCancel(context.Background())
srv.processHeadersWorkers = utils.NewBackgroundStoppableWorkers()
return srv
}

// Stop instructs the server and all handlers to stop. It returns when all handlers
// are done executing.
func (srv *webrtcServer) Stop() {
srv.cancel()
srv.processHeadersMu.Lock()
srv.logger.Info("waiting for handlers to complete")
srv.processHeadersWorkers.Wait()
srv.processHeadersMu.Unlock()
srv.processHeadersWorkers.Stop()
srv.logger.Info("handlers complete")
srv.logger.Info("closing lingering peer connections")

Expand Down
2 changes: 1 addition & 1 deletion rpc/wrtc_server_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newWebRTCServerChannel(
logger utils.ZapCompatibleLogger,
) *webrtcServerChannel {
base := newBaseChannel(
server.ctx,
server.processHeadersWorkers.Context(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An artifact of no longer storing a context.Context inside of the webrtcServer struct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha; this makes sense to me. There's a hierarchy here of WebRTC server owns WebRTC server channels owns WebRTC server streams. We want the context cancelations to propagate in that order. So, I think what you have here is correct. Once Stop() is called on the WebRTC server, we should cancel all the way down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed this will do the right thing. It would be more customary to just have newWebRTCServerChannel accept a context to bound its lifetime, but no need to change how things are structured right now.

peerConn,
dataChannel,
func() { server.removePeer(peerConn) },
Expand Down
14 changes: 1 addition & 13 deletions rpc/wrtc_server_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,30 +269,18 @@ func (s *webrtcServerStream) processHeaders(headers *webrtcpb.RequestHeaders) {
}

s.ch.server.counters.HeadersProcessed.Add(1)
s.ch.server.processHeadersMu.RLock()
s.ch.server.processHeadersWorkers.Add(1)
s.ch.server.processHeadersMu.RUnlock()

// Check if context has errored: underlying server may have been `Stop`ped,
// in which case we mark this processHeaders worker as `Done` and return.
if err := s.ch.server.ctx.Err(); err != nil {
s.ch.server.processHeadersWorkers.Done()
return
}

// take a ticket
select {
case s.ch.server.callTickets <- struct{}{}:
default:
s.ch.server.processHeadersWorkers.Done()
s.closeWithSendError(status.Error(codes.ResourceExhausted, "too many in-flight requests"))
return
}

s.headersReceived = true
utils.PanicCapturingGo(func() {
s.ch.server.processHeadersWorkers.Add(func(ctx context.Context) {
defer func() {
s.ch.server.processHeadersWorkers.Done()
<-s.ch.server.callTickets // return a ticket
}()
if err := handlerFunc(s); err != nil {
Expand Down
Loading