Skip to content

Commit

Permalink
zmq4: add context to semaphore lock
Browse files Browse the repository at this point in the history
This change adds context to the semaphore.lock method so that read can be
unblocked when the context is canceled, instead of resulting in a deadlock.
  • Loading branch information
dewei-verkada authored Nov 10, 2021
1 parent 2435d98 commit d200121
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
11 changes: 7 additions & 4 deletions msgio.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (q *qreader) rmConn(r *Conn) {
}

func (q *qreader) read(ctx context.Context, msg *Msg) error {
q.sem.lock()
q.sem.lock(ctx)
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -166,7 +166,7 @@ func (mw *mwriter) rmConn(w *Conn) {
}

func (w *mwriter) write(ctx context.Context, msg Msg) error {
w.sem.lock()
w.sem.lock(ctx)
grp, _ := errgroup.WithContext(ctx)
w.mu.Lock()
for i := range w.ws {
Expand Down Expand Up @@ -199,8 +199,11 @@ func (sem *semaphore) enable() {
}
}

func (sem *semaphore) lock() {
<-sem.ready
func (sem *semaphore) lock(ctx context.Context) {
select {
case <-ctx.Done():
case <-sem.ready:
}
}

var (
Expand Down
2 changes: 1 addition & 1 deletion pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (q *pubQReader) rmConn(r *Conn) {
}

func (q *pubQReader) read(ctx context.Context, msg *Msg) error {
q.sem.lock()
q.sem.lock(ctx)
select {
case <-ctx.Done():
case *msg = <-q.c:
Expand Down
4 changes: 2 additions & 2 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (q *routerQReader) rmConn(r *Conn) {
}

func (q *routerQReader) read(ctx context.Context, msg *Msg) error {
q.sem.lock()
q.sem.lock(ctx)
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -224,7 +224,7 @@ func (mw *routerMWriter) rmConn(w *Conn) {
}

func (w *routerMWriter) write(ctx context.Context, msg Msg) error {
w.sem.lock()
w.sem.lock(ctx)
grp, _ := errgroup.WithContext(ctx)
w.mu.Lock()
id := msg.Frames[0]
Expand Down

0 comments on commit d200121

Please sign in to comment.