Skip to content

Commit

Permalink
Fix connReaper goroutine may leak
Browse files Browse the repository at this point in the history
  • Loading branch information
egorse committed May 14, 2024
1 parent e75c615 commit fffddf1
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 2 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ require (
golang.org/x/sync v0.3.0
golang.org/x/text v0.13.0
)

require go.uber.org/goleak v1.3.0 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U=
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
Expand Down
101 changes: 101 additions & 0 deletions reaper/reaper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package reaper

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/go-zeromq/zmq4"
"go.uber.org/goleak"
)

// TestReaper does multiple rapid Dial/Close to check that connection reaper goroutines are not leaking.
// In is in own package as goleak detects also threads from values created during init().
func TestReaper(t *testing.T) {
defer goleak.VerifyNone(t)

mu := &sync.Mutex{}
errs := []error{}

ctx, cancel := context.WithCancel(context.Background())
rep := zmq4.NewRep(ctx)
ep := "ipc://@test.rep.socket"
err := rep.Listen(ep)
if err != nil {
t.Fatal(err)
}

maxClients := 100
maxMsgs := 100
wgClients := &sync.WaitGroup{}
wgServer := &sync.WaitGroup{}
client := func() {
defer wgClients.Done()
for n := 0; n < maxMsgs; n++ {
func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
req := zmq4.NewReq(ctx)
err := req.Dial(ep)
if err != nil {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
return
}

err = req.Close()
if err != nil {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
}()
}
}
server := func() {
defer wgServer.Done()
pong := zmq4.NewMsgString("pong")
for {
msg, err := rep.Recv()
if errors.Is(err, context.Canceled) {
break
}
if err != nil {
break
}
if string(msg.Frames[0]) != "ping" {
mu.Lock()
defer mu.Unlock()
errs = append(errs, errors.New("unexpected message"))
return
}
err = rep.Send(pong)
if err != nil {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
}
}

wgServer.Add(1)
go server()
wgClients.Add(maxClients)
for n := 0; n < maxClients; n++ {
go client()
}
wgClients.Wait()
cancel()
wgServer.Wait()
rep.Close()
for _, err := range errs {
t.Fatal(err)
}
}
17 changes: 15 additions & 2 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ func (sck *socket) topics() []string {

// Close closes the open Socket
func (sck *socket) Close() error {
// The Lock around Signal ensures the connReaper is running
// and is in sck.reaperCond.Wait()
sck.reaperCond.L.Lock()
sck.cancel()
sck.reaperCond.Signal()
sck.reaperCond.L.Unlock()

if sck.listener != nil {
defer sck.listener.Close()
Expand Down Expand Up @@ -193,7 +197,11 @@ func (sck *socket) Listen(endpoint string) error {
sck.listener = l

go sck.accept()
go sck.connReaper()
if !sck.reaperStarted {
sck.reaperCond.L.Lock()
go sck.connReaper()
sck.reaperStarted = true
}

return nil
}
Expand Down Expand Up @@ -268,6 +276,7 @@ connect:
}

if !sck.reaperStarted {
sck.reaperCond.L.Lock()
go sck.connReaper()
sck.reaperStarted = true
}
Expand Down Expand Up @@ -372,7 +381,11 @@ func (sck *socket) Timeout() time.Duration {
}

func (sck *socket) connReaper() {
sck.reaperCond.L.Lock()
// We are not locking here sck.reaperCond.L.Lock()
// as it should be locked prior starting connReaper as goroutine
// That would ensure that sck.reaperCond.Signal()
// would be delivered only when reaper goroutine is really started
// and is in sck.reaperCond.Wait()
defer sck.reaperCond.L.Unlock()

for {
Expand Down

0 comments on commit fffddf1

Please sign in to comment.