Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lab-igor-2 #3

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,10 @@ func NewAnalytics() *Analytics {
func (_ *Analytics) Send(ctx context.Context, message string, args ...string) {
d := delay.FromContext(ctx)
println("Analytics.Send " + d.String())
<-time.After(d)

select {
case <-ctx.Done():
println("Analytics.Send timeout")
case <-time.After(d):
}
}
92 changes: 92 additions & 0 deletions example/goroutine-leak/cmd/webserver/internal/service/foo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package service

import (
"context"
"errors"
"fmt"
)

type FooResponse struct {
BarID int
BazID int
}

type barClient interface {
GetBarID(ctx context.Context, id int) (barID int, err error)
}

type bazClient interface {
GetBazID(ctx context.Context, id int) (bazID int, err error)
}

type pool interface {
Enqueue(context.Context, func(poolCtx, taskCtx context.Context)) error
}

type Foo struct {
pool pool
bar barClient
baz bazClient
// ...
}

func NewFooService(pool pool, bar barClient, baz bazClient) *Foo {
return &Foo{
pool: pool,
bar: bar,
baz: baz,
}
}

type MyFuncType func(ctx context.Context, id int) (barID int, err error)

func (f *Foo) someFunc(con context.Context, stopCon context.Context, cancelFunc context.CancelFunc, getFunc MyFuncType, id int, result *int, err *error) chan struct{} {
finish := make(chan struct{})
_ = f.pool.Enqueue(con, func(_, _ context.Context) {

defer func() {
finish <- struct{}{}
}()

res, respErr := getFunc(con, id)
if respErr != nil {
err = &respErr
cancelFunc()
return
}

result = &res
})

select {
case <-finish:
return finish
case <-stopCon.Done():
return finish
}
}

// GET /api/v1/foo -> json FooResponse
func (f *Foo) Foo(ctx context.Context, id int) (*FooResponse, error) {
defContext, cancel := context.WithCancel(ctx)

var barID int
var bazID int

var errorBar error
var errorBaz error

a := f.someFunc(ctx, defContext, cancel, f.bar.GetBarID, id, &barID, &errorBar)
b := f.someFunc(ctx, defContext, cancel, f.baz.GetBazID, id, &bazID, &errorBaz)
d, e := <-a, <-b
fmt.Print(d, e)

if errorBar != nil || errorBaz != nil {
return nil, errors.Join(errorBar, errorBaz)
}

return &FooResponse{
BarID: barID,
BazID: bazID,
}, nil
}
27 changes: 27 additions & 0 deletions example/goroutine-leak/cmd/webserver/internal/service/foo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package service

import (
"context"
"testing"
)

type bar struct {
}

func (bar) GetBarID(_ context.Context, _ int) (int, error) {
return 1, nil
}

type baz struct {
}

func (baz) GetBazID(_ context.Context, _ int) (int, error) {
return 2, nil
}

func TestFoo(t *testing.T) {
pool := NewPool(100)
foo := NewFooService(pool, bar{}, baz{})

_, _ = foo.Foo(context.Background(), 3)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 118 additions & 7 deletions example/goroutine-leak/cmd/webserver/internal/service/user.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,114 @@
package service

import "context"
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
)

//go:generate mockgen -source=$GOFILE -destination=internal/mock/$GOFILE

type Pool struct {
tasks chan func()
stop atomic.Bool
stopCh chan struct{}
wg sync.WaitGroup

poolCtx context.Context
cancel context.CancelFunc
}

func NewPool(count int) *Pool {
poolCtx, cancel := context.WithCancel(context.Background())

pool := &Pool{
tasks: make(chan func(), count),
stop: atomic.Bool{},
stopCh: make(chan struct{}),
poolCtx: poolCtx,
cancel: cancel,
}

pool.wg.Add(count) // counter+=100

for i := 0; i < count; i++ {
go func() {
defer pool.wg.Done() // counter--

for {
select {
case task := <-pool.tasks:
task() // blocking call
case <-pool.stopCh:
return
}
}
}()
}

return pool
}

func mergeContext(poolCtx, taskCtx context.Context) context.Context {
taskCtx, cancel := context.WithCancel(taskCtx)

go func() {
select {
case <-poolCtx.Done():
cancel()
case <-taskCtx.Done():
cancel()
}
}()

return taskCtx
}

func (p *Pool) Enqueue(ctx context.Context, task func(poolCtx, taskCtx context.Context)) error {
if p.stop.Load() {
return errors.New("Pool is stopping") // ErrPoolIsStopping
}

taskCtx := context.WithoutCancel(ctx) // <-

// (2) task(/*give it a time to wrap up*/, taskCtx) // <-

select {
case p.tasks <- func() { task(p.poolCtx, taskCtx) }:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (p *Pool) Stop(timeout time.Duration) error {
if !p.stop.CompareAndSwap(false, true) {
return nil
}

close(p.stopCh) // send a signal to stop

p.cancel() // <-poolCtx.Done()

doneCh := make(chan struct{})

go func() {
p.wg.Wait() // hangs
close(doneCh) // send a signal that we are done
}()

select {
case <-time.After(timeout): // timeout
return errors.New("stop timeout") // ErrStopTimout
case <-doneCh:
}

return nil
}

//------------------

type UserRepository interface {
Create(ctx context.Context, name, email string) (string, error)
Expand All @@ -13,21 +121,24 @@ type Analytics interface {
type UserService struct {
repo UserRepository
analytics Analytics
pool *Pool
}

func NewUserService(repo UserRepository, analytics Analytics) *UserService {
return &UserService{repo: repo, analytics: analytics}
return &UserService{
repo: repo,
analytics: analytics,
pool: NewPool(100),
}
}

func (s *UserService) Create(ctx context.Context, name, email string) error {
// create user in the database
userID, _ := s.repo.Create(ctx, name, email)

ctx = context.WithoutCancel(ctx)

// send analytics event synchronously
// which may cause goroutine and memory leak
go s.analytics.Send(ctx, "user created", userID)
_ = s.pool.Enqueue(ctx, func(_, ctx context.Context) {
s.analytics.Send(ctx, "user created", userID)
})

return nil
}
Loading
Loading