Skip to content

Commit fb417a2

Browse files
authoredAug 14, 2023
RSDK-4116: Ensure access to wheeled base left/right motors are synchronized. (viamrobotics#2667)
1 parent ee0174f commit fb417a2

File tree

2 files changed

+75
-72
lines changed

2 files changed

+75
-72
lines changed
 

‎operation/manager.go

+44-40
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,24 @@ import (
1010
"go.viam.com/utils"
1111
)
1212

13-
// SingleOperationManager ensures only 1 operation is happening a time
13+
// SingleOperationManager ensures only 1 operation is happening at a time.
1414
// An operation can be nested, so if there is already an operation in progress,
1515
// it can have sub-operations without an issue.
1616
type SingleOperationManager struct {
1717
mu sync.Mutex
1818
currentOp *anOp
1919
}
2020

21-
// CancelRunning cancel's a current operation unless it's mine.
21+
// CancelRunning cancels a current operation unless it's mine.
2222
func (sm *SingleOperationManager) CancelRunning(ctx context.Context) {
2323
if ctx.Value(somCtxKeySingleOp) != nil {
2424
return
2525
}
2626
sm.mu.Lock()
2727
defer sm.mu.Unlock()
28-
sm.cancelInLock(ctx)
28+
if sm.currentOp != nil {
29+
sm.currentOp.cancelAndWaitFunc()
30+
}
2931
}
3032

3133
// OpRunning returns if there is a current operation.
@@ -41,32 +43,49 @@ const somCtxKeySingleOp = somCtxKey(iota)
4143

4244
// New creates a new operation, cancels previous, returns a new context and function to call when done.
4345
func (sm *SingleOperationManager) New(ctx context.Context) (context.Context, func()) {
44-
// handle nested ops
46+
// Handle nested ops. Note an operation set on a context by one `SingleOperationManager` can be
47+
// observed on a different instance of a `SingleOperationManager`.
4548
if ctx.Value(somCtxKeySingleOp) != nil {
4649
return ctx, func() {}
4750
}
4851

4952
sm.mu.Lock()
5053

51-
// first cancel any old operation
52-
sm.cancelInLock(ctx)
54+
// Cancel any existing operation. This blocks until the operation is completed.
55+
if sm.currentOp != nil {
56+
sm.currentOp.cancelAndWaitFunc()
57+
}
5358

54-
theOp := &anOp{}
59+
theOp := &anOp{
60+
closedCond: sync.NewCond(&sm.mu),
61+
}
5562

5663
ctx = context.WithValue(ctx, somCtxKeySingleOp, theOp)
5764

58-
theOp.ctx, theOp.cancelFunc = context.WithCancel(ctx)
65+
var newUserCtx context.Context
66+
newUserCtx, theOp.interruptFunc = context.WithCancel(ctx)
67+
theOp.cancelAndWaitFunc = func() {
68+
// Precondition: Caller must be holding `sm.mu`.
69+
//
70+
// If there are two threads competing to win a race, it's not sufficient to return once the
71+
// condition variable is signaled. We must re-check that a new operation didn't beat us to
72+
// getting the next operation slot.
73+
//
74+
// Ironically, "winning the race" in this scenario just means the "loser" is going to
75+
// immediately interrupt the winner. A future optimization could avoid this unnecessary
76+
// starting/stopping.
77+
for sm.currentOp != nil {
78+
sm.currentOp.interruptFunc()
79+
sm.currentOp.closedCond.Wait()
80+
}
81+
}
5982
sm.currentOp = theOp
6083
sm.mu.Unlock()
6184

62-
return theOp.ctx, func() {
63-
if !theOp.closed {
64-
theOp.closed = true
65-
}
85+
return newUserCtx, func() {
6686
sm.mu.Lock()
67-
if theOp == sm.currentOp {
68-
sm.currentOp = nil
69-
}
87+
theOp.closedCond.Broadcast()
88+
sm.currentOp = nil
7089
sm.mu.Unlock()
7190
}
7291
}
@@ -93,17 +112,11 @@ func (sm *SingleOperationManager) WaitTillNotPowered(ctx context.Context, pollTi
93112
) (err error) {
94113
// Defers a function that will stop and clean up if the context errors
95114
defer func(ctx context.Context) {
96-
var errStop error
97115
if errors.Is(ctx.Err(), context.Canceled) {
98-
sm.mu.Lock()
99-
oldOp := sm.currentOp == ctx.Value(somCtxKeySingleOp)
100-
sm.mu.Unlock()
101-
102-
if oldOp || sm.currentOp == nil {
103-
errStop = stop(ctx, map[string]interface{}{})
104-
}
116+
err = multierr.Combine(ctx.Err(), stop(ctx, map[string]interface{}{}))
117+
} else {
118+
err = ctx.Err()
105119
}
106-
err = multierr.Combine(ctx.Err(), errStop)
107120
}(ctx)
108121
return sm.WaitForSuccess(
109122
ctx,
@@ -139,21 +152,12 @@ func (sm *SingleOperationManager) WaitForSuccess(
139152
}
140153
}
141154

142-
func (sm *SingleOperationManager) cancelInLock(ctx context.Context) {
143-
myOp := ctx.Value(somCtxKeySingleOp)
144-
op := sm.currentOp
145-
146-
if op == nil || myOp == op {
147-
return
148-
}
149-
150-
op.cancelFunc()
151-
152-
sm.currentOp = nil
153-
}
154-
155155
type anOp struct {
156-
ctx context.Context
157-
cancelFunc context.CancelFunc
158-
closed bool
156+
// cancelAndWaitFunc waits until the `SingleOperationManager.currentOp` is empty. This will
157+
// interrupt any existing operations as necessary.
158+
cancelAndWaitFunc func()
159+
// Cancels the context of what's currently running an operation.
160+
interruptFunc context.CancelFunc
161+
// Used with `SingleOperationManager.mu`.
162+
closedCond *sync.Cond
159163
}

‎operation/manager_test.go

+31-32
Original file line numberDiff line numberDiff line change
@@ -97,22 +97,39 @@ func TestDontCancel(t *testing.T) {
9797
}
9898

9999
func TestCancelRace(t *testing.T) {
100+
// First set up the "worker" context and register an operation on
101+
// the `SingleOperationManager` instance.
100102
som := SingleOperationManager{}
101-
ctx, done := som.New(context.Background())
102-
defer done()
103-
104-
var wg sync.WaitGroup
105-
106-
wg.Add(1)
107-
go func() {
108-
_, done := som.New(context.Background())
109-
wg.Done()
110-
defer done()
111-
}()
103+
workerError := make(chan error, 1)
104+
workerCtx, somCleanupFunc := som.New(context.Background())
105+
106+
// Spin up a separate go-routine for the worker to listen for cancelation. Canceling an
107+
// operation blocks until the operation completes. This goroutine is responsible for running
108+
// `somCleanupFunc` to signal that canceling has completed.
109+
workerFunc := func() {
110+
defer somCleanupFunc()
111+
112+
select {
113+
case <-workerCtx.Done():
114+
workerError <- nil
115+
case <-time.After(5 * time.Second):
116+
workerError <- errors.New("Failed to be signaled via a cancel")
117+
}
112118

113-
som.CancelRunning(ctx)
114-
wg.Wait()
115-
test.That(t, ctx.Err(), test.ShouldNotBeNil)
119+
close(workerError)
120+
}
121+
go workerFunc()
122+
123+
// Set up a "test" context to cancel the worker.
124+
testCtx, testCleanupFunc := context.WithTimeout(context.Background(), time.Second)
125+
defer testCleanupFunc()
126+
som.CancelRunning(testCtx)
127+
// If `workerCtx.Done` was observed to be closed, the worker thread will pass a `nil` error back.
128+
test.That(t, <-workerError, test.ShouldBeNil)
129+
// When `SingleOperationManager` cancels an operation, the operation's context should be in a
130+
// "context canceled" error state.
131+
test.That(t, workerCtx.Err(), test.ShouldNotBeNil)
132+
test.That(t, workerCtx.Err(), test.ShouldEqual, context.Canceled)
116133
}
117134

118135
func TestStopCalled(t *testing.T) {
@@ -155,24 +172,6 @@ func TestErrorContainsStopAndCancel(t *testing.T) {
155172
test.That(t, errRet.Error(), test.ShouldEqual, "context canceled; Stop failed")
156173
}
157174

158-
func TestStopNotCalledOnOldContext(t *testing.T) {
159-
som := SingleOperationManager{}
160-
ctx, done := som.New(context.Background())
161-
defer done()
162-
mock := &mock{stopCount: 0}
163-
var wg sync.WaitGroup
164-
165-
wg.Add(1)
166-
go func() {
167-
som.WaitTillNotPowered(ctx, time.Second, mock, mock.stop)
168-
wg.Done()
169-
}()
170-
som.New(context.Background())
171-
wg.Wait()
172-
test.That(t, ctx.Err(), test.ShouldNotBeNil)
173-
test.That(t, mock.stopCount, test.ShouldEqual, 0)
174-
}
175-
176175
type mock struct {
177176
stopCount int
178177
}

0 commit comments

Comments
 (0)
Please sign in to comment.