-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RSDK-8666] Use Stoppable Workers #408
Conversation
rpc/wrtc_call_queue_memory.go
Outdated
} | ||
select { | ||
case <-cancelCtx.Done(): | ||
if ctx.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this check results in a deadlock/hang when running tests in the rpc
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we'll still want to check context. This looks correct. Ticker case should be removed as you've done, too. Just see my comment about not needing the for loop surrounding this at all.
@@ -37,7 +37,7 @@ func newWebRTCServerChannel( | |||
logger utils.ZapCompatibleLogger, | |||
) *webrtcServerChannel { | |||
base := newBaseChannel( | |||
server.ctx, | |||
server.processHeadersWorkers.Context(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An artifact of no longer storing a context.Context
inside of the webrtcServer
struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha; this makes sense to me. There's a hierarchy here of WebRTC server owns WebRTC server channels owns WebRTC server streams. We want the context cancelations to propagate in that order. So, I think what you have here is correct. Once Stop()
is called on the WebRTC server, we should cancel all the way down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed this will do the right thing. It would be more customary to just have newWebRTCServerChannel
accept a context to bound its lifetime, but no need to change how things are structured right now.
rpc/wrtc_server_stream.go
Outdated
if err := s.ch.server.ctx.Err(); err != nil { | ||
s.ch.server.processHeadersWorkers.Done() | ||
// in which case we return. | ||
if err := s.ch.server.processHeadersWorkers.Context().Err(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also an artifact of removing the webrtcServer
struct's context.Context
. See above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dgottlieb do you remember why we need this context check before even taking a ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing anything arguing the ordering is important. I think the ordering was this was simply more to make the relationship between Adding
and the context canceling explicit. Because Stop
both cancels the context and waits on the waitgroup.
My expectation is that by moving to StoppableWorkers, we no longer need this if-block. StoppableWorkers does that for us before calling its worker function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool; that's my thought, too. Wanna remove this whole if block @bashar-515 ?
rpc/wrtc_call_queue_mongodb.go
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is pretty critical to the functioning of distributed signaling for Viam. I'm ok not even making changes to it, since we haven't yet seen any issues in its usage of wait groups/contexts. If you and @dgottlieb want to get stoppable workers into this file, though, then I think we should test a local version of app + MongoDB to ensure signaling still works. Can also probably do it with an app PR that creates a temporary staging environment.
rpc/wrtc_server_stream.go
Outdated
if err := s.ch.server.ctx.Err(); err != nil { | ||
s.ch.server.processHeadersWorkers.Done() | ||
// in which case we return. | ||
if err := s.ch.server.processHeadersWorkers.Context().Err(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dgottlieb do you remember why we need this context check before even taking a ticket?
rpc/wrtc_call_queue_memory.go
Outdated
delete(hostQueue.activeOffers, offerID) | ||
} | ||
queue.activeBackgroundWorkers = utils.NewStoppableWorkerWithTicker(5*time.Second, func(ctx context.Context) { | ||
if ctx.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, this was necessary to exit the loop. But this is no longer serving a purpose. The stoppable workers caller checks the state/context before calling this method.
queue.activeBackgroundWorkers.Add(1) | ||
utils.PanicCapturingGo(func() { | ||
queue.activeBackgroundWorkers.Done() | ||
queue.activeBackgroundWorkers.Add(func(ctx context.Context) { | ||
select { | ||
case <-sendCtx.Done(): | ||
case <-ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer the same context. Before this ctx
was the one passed into SendOfferInit
. Now we've shadowed that with the activeBackgroundWorkers
context. Which is already covered by the sendCtx
.
Taking a step back. In the existing code, I'm not sure what the relationship is between the caller's context and the queue.cancelCtx
. I think* the SendOfferInit
ctx
is the gRPC request's context. In which case we'd want to revert to using that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch totally missed this
rpc/wrtc_call_queue_mongodb.go
Outdated
|
||
csStateUpdates: make(chan changeStreamStateUpdate), | ||
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{}, | ||
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{}, | ||
activeAnswerersfunc: &activeAnswerersfunc, | ||
} | ||
|
||
queue.activeBackgroundWorkers.Add(2) | ||
// TODO(RSDK-8666): using StoppableWorkers is causing a data race |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we find any more detail about what was happening here?
rpc/wrtc_call_queue_mongodb.go
Outdated
defer queue.csManagerSeq.Add(1) // helpful on panicked restart | ||
select { | ||
case <-queue.cancelCtx.Done(): | ||
case <-queue.activeStoppableWorkers.Context().Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this to be replaced with the input ctx
.
rpc/wrtc_call_queue_mongodb.go
Outdated
utils.ManagedGo(queue.operatorLivenessLoop, queue.activeBackgroundWorkers.Done) | ||
utils.ManagedGo(queue.changeStreamManager, queue.activeBackgroundWorkers.Done) | ||
// TODO(RSDK-8666): using StoppableWorkers is causing a data race | ||
queue.activeStoppableWorkers.Add(func(ctx context.Context) { queue.operatorLivenessLoop() }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change the queue.operatorLivenessLoop
and queue.changeStreamManager
functions to accept the context. And change their select ... queue.activeStoppableWorkers.Context()
to instead select on the function input ctx
.
rpc/wrtc_call_queue_mongodb.go
Outdated
@@ -344,7 +339,7 @@ func (queue *mongoDBWebRTCCallQueue) operatorLivenessLoop() { | |||
ticker := time.NewTicker(operatorStateUpdateInterval) | |||
defer ticker.Stop() | |||
for { | |||
if !utils.SelectContextOrWaitChan(queue.cancelCtx, ticker.C) { | |||
if !utils.SelectContextOrWaitChan(queue.activeStoppableWorkers.Context(), ticker.C) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the comment on having operatorLivenessLoop
accepting a ctx
, this is the line that caught my attention. We should just be checking the input context
and not need to do this lookup on the stoppable workers itself.
rpc/wrtc_call_queue_mongodb.go
Outdated
@@ -500,11 +496,11 @@ func (queue *mongoDBWebRTCCallQueue) changeStreamManager() { | |||
queue.csStateMu.Unlock() | |||
activeHosts.Set(queue.operatorID, int64(len(hosts))) | |||
|
|||
nextCSCtx, nextCSCtxCancel := context.WithCancel(queue.cancelCtx) | |||
nextCSCtx, nextCSCtxCancel := context.WithCancel(queue.activeStoppableWorkers.Context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ctx
rpc/wrtc_call_queue_mongodb.go
Outdated
@@ -235,11 +230,11 @@ func NewMongoDBWebRTCCallQueue( | |||
}) | |||
queue.subscriptionManager(newState.ChangeStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also pass the ctx
into subscriptionManager
rpc/wrtc_call_queue_mongodb.go
Outdated
@@ -677,14 +673,14 @@ func (queue *mongoDBWebRTCCallQueue) processNextSubscriptionEvent(next mongoutil | |||
func (queue *mongoDBWebRTCCallQueue) subscriptionManager(currentCS <-chan mongoutils.ChangeEventResult) { | |||
var waitForNextCS bool | |||
for { | |||
if queue.cancelCtx.Err() != nil { | |||
if queue.activeStoppableWorkers.Context().Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per above, if we add in the ctx
to this call, we can swap out accessing activeStoppableWorkers
here.
rpc/wrtc_call_queue_mongodb.go
Outdated
@@ -889,7 +885,7 @@ func (queue *mongoDBWebRTCCallQueue) SendOfferInit( | |||
sendCtx, sendCtxCancel := context.WithDeadline(ctx, offerDeadline) | |||
|
|||
// need to watch before insertion to avoid a race | |||
sendAndQueueCtx, sendAndQueueCtxCancel := utils.MergeContext(sendCtx, queue.cancelCtx) | |||
sendAndQueueCtx, sendAndQueueCtxCancel := utils.MergeContext(sendCtx, queue.activeStoppableWorkers.Context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we can keep this to minimize the changes as part of this refactor. But this feels clunky. We're just merging here to reduce the number of cases inside the below selects by one.
And I think there are only two select statements.
@@ -37,7 +37,7 @@ func newWebRTCServerChannel( | |||
logger utils.ZapCompatibleLogger, | |||
) *webrtcServerChannel { | |||
base := newBaseChannel( | |||
server.ctx, | |||
server.processHeadersWorkers.Context(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed this will do the right thing. It would be more customary to just have newWebRTCServerChannel
accept a context to bound its lifetime, but no need to change how things are structured right now.
No longer upgrading rpc/wrtc_call_queue_mongodb.go. Ticket tracking it here. |
WIP
This PR is a continuation of #402 re: this comment from @benjirewis (i.e., it updates
rpc/wrtc_server.go
,rpc/wrtc_call_queue_memory.go
, andrpc/wrtc_call_queue_mongodb.go
to useStoppableWorkers
- and some other files as a side effect).