-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RSDK-8666] Use Stoppable Workers #408
base: main
Are you sure you want to change the base?
Conversation
rpc/wrtc_call_queue_memory.go
Outdated
} | ||
select { | ||
case <-cancelCtx.Done(): | ||
if ctx.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this check results in a deadlock/hang when running tests in the rpc
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we'll still want to check context. This looks correct. Ticker case should be removed as you've done, too. Just see my comment about not needing the for loop surrounding this at all.
@@ -37,7 +37,7 @@ func newWebRTCServerChannel( | |||
logger utils.ZapCompatibleLogger, | |||
) *webrtcServerChannel { | |||
base := newBaseChannel( | |||
server.ctx, | |||
server.processHeadersWorkers.Context(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An artifact of no longer storing a context.Context
inside of the webrtcServer
struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha; this makes sense to me. There's a hierarchy here of WebRTC server owns WebRTC server channels owns WebRTC server streams. We want the context cancelations to propagate in that order. So, I think what you have here is correct. Once Stop()
is called on the WebRTC server, we should cancel all the way down.
if err := s.ch.server.ctx.Err(); err != nil { | ||
s.ch.server.processHeadersWorkers.Done() | ||
// in which case we return. | ||
if err := s.ch.server.processHeadersWorkers.Context().Err(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also an artifact of removing the webrtcServer
struct's context.Context
. See above.
|
||
csStateUpdates: make(chan changeStreamStateUpdate), | ||
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{}, | ||
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{}, | ||
activeAnswerersfunc: &activeAnswerersfunc, | ||
} | ||
|
||
queue.activeBackgroundWorkers.Add(2) | ||
// TODO(RSDK-8666): using StoppableWorkers is causing a data race |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The liveness loop is racing? Could you follow up with me offline about what the race looks like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! These are definitely harder than the others. We can chat offline about the data race/general design here.
rpc/wrtc_call_queue_memory.go
Outdated
queue.activeBackgroundWorkers.Add(1) | ||
ticker := time.NewTicker(5 * time.Second) | ||
utils.ManagedGo(func() { | ||
queue.activeBackgroundWorkers = utils.NewStoppableWorkerWithTicker(5*time.Second, func(ctx context.Context) { | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think that the for
loop here is no longer necessary given the logic within it is now being called every 5s. I.e., the looping is handled by the stoppable workers objects itself?
rpc/wrtc_call_queue_memory.go
Outdated
} | ||
select { | ||
case <-cancelCtx.Done(): | ||
if ctx.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we'll still want to check context. This looks correct. Ticker case should be removed as you've done, too. Just see my comment about not needing the for loop surrounding this at all.
|
||
csStateUpdates: make(chan changeStreamStateUpdate), | ||
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{}, | ||
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{}, | ||
activeAnswerersfunc: &activeAnswerersfunc, | ||
} | ||
|
||
queue.activeBackgroundWorkers.Add(2) | ||
// TODO(RSDK-8666): using StoppableWorkers is causing a data race |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The liveness loop is racing? Could you follow up with me offline about what the race looks like?
@@ -37,7 +37,7 @@ func newWebRTCServerChannel( | |||
logger utils.ZapCompatibleLogger, | |||
) *webrtcServerChannel { | |||
base := newBaseChannel( | |||
server.ctx, | |||
server.processHeadersWorkers.Context(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha; this makes sense to me. There's a hierarchy here of WebRTC server owns WebRTC server channels owns WebRTC server streams. We want the context cancelations to propagate in that order. So, I think what you have here is correct. Once Stop()
is called on the WebRTC server, we should cancel all the way down.
rpc/wrtc_server_stream.go
Outdated
return | ||
} | ||
|
||
// take a ticket | ||
select { | ||
case s.ch.server.callTickets <- struct{}{}: | ||
default: | ||
s.ch.server.processHeadersWorkers.Done() | ||
s.closeWithSendError(status.Error(codes.ResourceExhausted, "too many in-flight requests")) | ||
return | ||
} | ||
|
||
s.headersReceived = true | ||
utils.PanicCapturingGo(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should probably be a s.ch.server.processHeadersWorkers.Add(
call instead of a PanicCapturingGo
call.
This reverts commit b203203.
ca36657
to
8a66f53
Compare
WIP
This PR is a continuation of #402 re: this comment from @benjirewis (i.e., it updates
rpc/wrtc_server.go
,rpc/wrtc_call_queue_memory.go
, andrpc/wrtc_call_queue_mongodb.go
to useStoppableWorkers
- and some other files as a side effect).