@@ -52,17 +52,6 @@ type promiseState struct {
52
52
// the promise leaves the unresolved state.
53
53
caller PipelineCaller
54
54
55
- // ongoingCalls counts the number of calls to caller that have not
56
- // yielded an Answer yet (but not necessarily finished).
57
- ongoingCalls int
58
- // If callsStopped is non-nil, then the promise has entered into
59
- // the pending state and is waiting for ongoingCalls to drop to zero.
60
- // After decrementing ongoingCalls, callsStopped should be closed if
61
- // ongoingCalls is zero to wake up the goroutine.
62
- //
63
- // Only Fulfill or Reject will set callsStopped.
64
- callsStopped chan struct {}
65
-
66
55
// clients is a table of promised clients created to proxy the eventual
67
56
// result's clients. Even after resolution, this table may still have
68
57
// entries until the clients are released. Cannot be read or written
@@ -157,32 +146,19 @@ func (p *Promise) Reject(e error) {
157
146
// If e != nil, then this is equivalent to p.Reject(e).
158
147
// Otherwise, it is equivalent to p.Fulfill(r).
159
148
func (p * Promise ) Resolve (r Ptr , e error ) {
160
- var (
161
- shutdownPromises []* clientPromise
162
-
163
- // We need to access some of these fields from p.state while
164
- // not holding the lock, so we store them here while holding it.
165
- // p.clients cannot be touched in the pending resolution state,
166
- // so we have exclusive access to the variable anyway.
167
- clients map [clientPath ]* clientAndPromise
168
- callsStopped chan struct {}
169
- )
149
+ var shutdownPromises []* clientPromise
170
150
171
- p .state .With (func (p * promiseState ) {
151
+ // It's ok to extract p.clients and use it while not holding the lock:
152
+ // it may not be accessed in the pending resolution state, so we have
153
+ // exclusive access to the variable anyway.
154
+ clients := mutex .With1 (& p .state , func (p * promiseState ) map [clientPath ]* clientAndPromise {
172
155
if e != nil {
173
156
p .requireUnresolved ("Reject" )
174
157
} else {
175
158
p .requireUnresolved ("Fulfill" )
176
159
}
177
160
p .caller = nil
178
-
179
- if p .ongoingCalls > 0 {
180
- p .callsStopped = make (chan struct {})
181
- }
182
-
183
- if len (p .clients ) > 0 || p .ongoingCalls > 0 {
184
- clients = p .clients
185
- }
161
+ return p .clients
186
162
})
187
163
188
164
// Pending resolution state: wait for clients to be fulfilled
@@ -194,13 +170,9 @@ func (p *Promise) Resolve(r Ptr, e error) {
194
170
shutdownPromises = append (shutdownPromises , cp .promise )
195
171
cp .promise = nil
196
172
}
197
- if callsStopped != nil {
198
- <- callsStopped
199
- }
200
173
201
174
p .state .With (func (p * promiseState ) {
202
175
// Move p into resolved state.
203
- p .callsStopped = nil
204
176
p .result , p .err = r , e
205
177
for _ , f := range p .signals {
206
178
f ()
@@ -353,17 +325,9 @@ func (ans *Answer) PipelineSend(ctx context.Context, transform []PipelineOp, s S
353
325
l := p .state .Lock ()
354
326
switch {
355
327
case l .Value ().isUnresolved ():
356
- l .Value ().ongoingCalls ++
357
328
caller := l .Value ().caller
358
329
l .Unlock ()
359
- ans , release := caller .PipelineSend (ctx , transform , s )
360
- p .state .With (func (p * promiseState ) {
361
- p .ongoingCalls --
362
- if p .ongoingCalls == 0 && p .callsStopped != nil {
363
- close (p .callsStopped )
364
- }
365
- })
366
- return ans , release
330
+ return caller .PipelineSend (ctx , transform , s )
367
331
case l .Value ().isPendingResolution ():
368
332
// Block new calls until resolved.
369
333
l .Unlock ()
@@ -389,17 +353,9 @@ func (ans *Answer) PipelineRecv(ctx context.Context, transform []PipelineOp, r R
389
353
l := p .state .Lock ()
390
354
switch {
391
355
case l .Value ().isUnresolved ():
392
- l .Value ().ongoingCalls ++
393
356
caller := l .Value ().caller
394
357
l .Unlock ()
395
- pcall := caller .PipelineRecv (ctx , transform , r )
396
- p .state .With (func (p * promiseState ) {
397
- p .ongoingCalls --
398
- if p .ongoingCalls == 0 && p .callsStopped != nil {
399
- close (p .callsStopped )
400
- }
401
- })
402
- return pcall
358
+ return caller .PipelineRecv (ctx , transform , r )
403
359
case l .Value ().isPendingResolution ():
404
360
// Block new calls until resolved.
405
361
l .Unlock ()
0 commit comments