Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connReaper goroutine may leak #154

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ require (
golang.org/x/sync v0.3.0
golang.org/x/text v0.13.0
)

require go.uber.org/goleak v1.3.0 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U=
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
Expand Down
101 changes: 101 additions & 0 deletions leaks_test/reaper_leak_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package leaks_test

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/go-zeromq/zmq4"
"go.uber.org/goleak"
)

// TestReaper does multiple rapid Dial/Close to check that connection reaper goroutines are not leaking.
// TestReaper is in a dedicated package as goleak detects also goroutines from values created during init().
func TestReaperLeak1(t *testing.T) {
defer goleak.VerifyNone(t)

mu := &sync.Mutex{}
errs := []error{}

ctx, cancel := context.WithCancel(context.Background())
rep := zmq4.NewRep(ctx)
ep := "ipc://@test.rep.socket"
err := rep.Listen(ep)
if err != nil {
t.Fatal(err)
}

maxClients := 100
maxMsgs := 100
wgClients := &sync.WaitGroup{}
wgServer := &sync.WaitGroup{}
client := func() {
defer wgClients.Done()
for n := 0; n < maxMsgs; n++ {
func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
req := zmq4.NewReq(ctx)
err := req.Dial(ep)
if err != nil {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
return
}

err = req.Close()
if err != nil {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
}()
}
}
server := func() {
defer wgServer.Done()
pong := zmq4.NewMsgString("pong")
for {
msg, err := rep.Recv()
if errors.Is(err, context.Canceled) {
break
}
if err != nil {
break
}
if string(msg.Frames[0]) != "ping" {
mu.Lock()
defer mu.Unlock()
errs = append(errs, errors.New("unexpected message"))
return
}
err = rep.Send(pong)
if err != nil {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
}
}

wgServer.Add(1)
go server()
wgClients.Add(maxClients)
for n := 0; n < maxClients; n++ {
go client()
}
wgClients.Wait()
cancel()
wgServer.Wait()
rep.Close()
for _, err := range errs {
t.Fatal(err)
}
}
17 changes: 15 additions & 2 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ func (sck *socket) topics() []string {

// Close closes the open Socket
func (sck *socket) Close() error {
// The Lock around Signal ensures the connReaper is running
// and is in sck.reaperCond.Wait()
sck.reaperCond.L.Lock()
sck.cancel()
sck.reaperCond.Signal()
sck.reaperCond.L.Unlock()

if sck.listener != nil {
defer sck.listener.Close()
Expand Down Expand Up @@ -193,7 +197,11 @@ func (sck *socket) Listen(endpoint string) error {
sck.listener = l

go sck.accept()
go sck.connReaper()
if !sck.reaperStarted {
sck.reaperCond.L.Lock()
go sck.connReaper()
sck.reaperStarted = true
}

return nil
}
Expand Down Expand Up @@ -268,6 +276,7 @@ connect:
}

if !sck.reaperStarted {
sck.reaperCond.L.Lock()
go sck.connReaper()
sck.reaperStarted = true
}
Expand Down Expand Up @@ -372,7 +381,11 @@ func (sck *socket) Timeout() time.Duration {
}

func (sck *socket) connReaper() {
sck.reaperCond.L.Lock()
// We are not locking here sck.reaperCond.L.Lock()
// as it should be locked prior starting connReaper as goroutine
// That would ensure that sck.reaperCond.Signal()
// would be delivered only when reaper goroutine is really started
// and is in sck.reaperCond.Wait()
defer sck.reaperCond.L.Unlock()

for {
Expand Down
Loading