Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8666] Use Stoppable Workers #408

Open
wants to merge 37 commits into
base: main
Choose a base branch
from

Conversation

bashar-515
Copy link
Member

@bashar-515 bashar-515 commented Jan 22, 2025

WIP

This PR is a continuation of #402 re: this comment from @benjirewis (i.e., it updates rpc/wrtc_server.go, rpc/wrtc_call_queue_memory.go, and rpc/wrtc_call_queue_mongodb.go to use StoppableWorkers - and some other files as a side effect).

  • rpc/wrtc_server.go
  • rpc/wrtc_call_queue_memory.go
  • rpc/wrtc_call_queue_mongodb.go

@viambot viambot added the safe to test Mark as safe to test label Jan 22, 2025
}
select {
case <-cancelCtx.Done():
if ctx.Err() != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this check results in a deadlock/hang when running tests in the rpc package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we'll still want to check context. This looks correct. Ticker case should be removed as you've done, too. Just see my comment about not needing the for loop surrounding this at all.

@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@@ -37,7 +37,7 @@ func newWebRTCServerChannel(
logger utils.ZapCompatibleLogger,
) *webrtcServerChannel {
base := newBaseChannel(
server.ctx,
server.processHeadersWorkers.Context(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An artifact of no longer storing a context.Context inside of the webrtcServer struct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha; this makes sense to me. There's a hierarchy here of WebRTC server owns WebRTC server channels owns WebRTC server streams. We want the context cancelations to propagate in that order. So, I think what you have here is correct. Once Stop() is called on the WebRTC server, we should cancel all the way down.

if err := s.ch.server.ctx.Err(); err != nil {
s.ch.server.processHeadersWorkers.Done()
// in which case we return.
if err := s.ch.server.processHeadersWorkers.Context().Err(); err != nil {
Copy link
Member Author

@bashar-515 bashar-515 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also an artifact of removing the webrtcServer struct's context.Context. See above.

@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 23, 2025

csStateUpdates: make(chan changeStreamStateUpdate),
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{},
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{},
activeAnswerersfunc: &activeAnswerersfunc,
}

queue.activeBackgroundWorkers.Add(2)
// TODO(RSDK-8666): using StoppableWorkers is causing a data race
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The liveness loop is racing? Could you follow up with me offline about what the race looks like?

@bashar-515 bashar-515 requested a review from benjirewis January 23, 2025 01:12
Copy link
Member

@benjirewis benjirewis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! These are definitely harder than the others. We can chat offline about the data race/general design here.

queue.activeBackgroundWorkers.Add(1)
ticker := time.NewTicker(5 * time.Second)
utils.ManagedGo(func() {
queue.activeBackgroundWorkers = utils.NewStoppableWorkerWithTicker(5*time.Second, func(ctx context.Context) {
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that the for loop here is no longer necessary given the logic within it is now being called every 5s. I.e., the looping is handled by the stoppable workers objects itself?

}
select {
case <-cancelCtx.Done():
if ctx.Err() != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we'll still want to check context. This looks correct. Ticker case should be removed as you've done, too. Just see my comment about not needing the for loop surrounding this at all.


csStateUpdates: make(chan changeStreamStateUpdate),
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{},
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{},
activeAnswerersfunc: &activeAnswerersfunc,
}

queue.activeBackgroundWorkers.Add(2)
// TODO(RSDK-8666): using StoppableWorkers is causing a data race
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The liveness loop is racing? Could you follow up with me offline about what the race looks like?

@@ -37,7 +37,7 @@ func newWebRTCServerChannel(
logger utils.ZapCompatibleLogger,
) *webrtcServerChannel {
base := newBaseChannel(
server.ctx,
server.processHeadersWorkers.Context(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha; this makes sense to me. There's a hierarchy here of WebRTC server owns WebRTC server channels owns WebRTC server streams. We want the context cancelations to propagate in that order. So, I think what you have here is correct. Once Stop() is called on the WebRTC server, we should cancel all the way down.

return
}

// take a ticket
select {
case s.ch.server.callTickets <- struct{}{}:
default:
s.ch.server.processHeadersWorkers.Done()
s.closeWithSendError(status.Error(codes.ResourceExhausted, "too many in-flight requests"))
return
}

s.headersReceived = true
utils.PanicCapturingGo(func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably be a s.ch.server.processHeadersWorkers.Add( call instead of a PanicCapturingGo call.

@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 23, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 23, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 24, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 24, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 24, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 24, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
safe to test Mark as safe to test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants