Skip to content

Commit 30b18eb

Browse files
committed
added pgx rollback if context cancelled
1 parent fc0b7ba commit 30b18eb

5 files changed

+142
-24
lines changed

.golangci.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ linters:
2929
- gocritic
3030
- gocyclo
3131
- godot
32-
- goerr113
32+
- err113
3333
- gofmt
3434
- gofumpt
3535
- goimports
@@ -129,7 +129,7 @@ issues:
129129
- goconst
130130
- gomnd
131131
- containedctx
132-
- goerr113
132+
- err113
133133
- errcheck
134134
- nolintlint
135135
- forcetypeassert

drivers/pgxv4/transaction.go

+17-9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pgxv4
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/jackc/pgx/v4"
78

@@ -11,10 +12,19 @@ import (
1112

1213
// Transaction is trm.Transaction for pgx.Tx.
1314
type Transaction struct {
15+
mu sync.Mutex
1416
tx pgx.Tx
1517
isClosed *drivers.IsClosed
1618
}
1719

20+
func newDefaultTransaction(tx pgx.Tx) *Transaction {
21+
return &Transaction{
22+
mu: sync.Mutex{},
23+
tx: tx,
24+
isClosed: drivers.NewIsClosed(),
25+
}
26+
}
27+
1828
// NewTransaction creates trm.Transaction for pgx.Tx.
1929
func NewTransaction(
2030
ctx context.Context,
@@ -26,10 +36,7 @@ func NewTransaction(
2636
return ctx, nil, err
2737
}
2838

29-
tr := &Transaction{
30-
tx: tx,
31-
isClosed: drivers.NewIsClosed(),
32-
}
39+
tr := newDefaultTransaction(tx)
3340

3441
go tr.awaitDone(ctx)
3542

@@ -43,7 +50,7 @@ func (t *Transaction) awaitDone(ctx context.Context) {
4350

4451
select {
4552
case <-ctx.Done():
46-
t.isClosed.Close()
53+
_ = t.Rollback(ctx)
4754
case <-t.isClosed.Closed():
4855
}
4956
}
@@ -60,23 +67,24 @@ func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Contex
6067
return ctx, nil, err
6168
}
6269

63-
tr := &Transaction{
64-
tx: tx,
65-
isClosed: drivers.NewIsClosed(),
66-
}
70+
tr := newDefaultTransaction(tx)
6771

6872
return ctx, tr, nil
6973
}
7074

7175
// Commit the trm.Transaction.
7276
func (t *Transaction) Commit(ctx context.Context) error {
77+
t.mu.Lock()
78+
defer t.mu.Unlock()
7379
defer t.isClosed.Close()
7480

7581
return t.tx.Commit(ctx)
7682
}
7783

7884
// Rollback the trm.Transaction.
7985
func (t *Transaction) Rollback(ctx context.Context) error {
86+
t.mu.Lock()
87+
defer t.mu.Unlock()
8088
defer t.isClosed.Close()
8189

8290
return t.tx.Rollback(ctx)

drivers/pgxv4/transaction_integration_test.go

+53-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ package pgxv4_test
66
import (
77
"context"
88
"fmt"
9+
"sync/atomic"
910
"testing"
11+
"time"
1012

1113
"github.com/jackc/pgx/v4"
1214
"github.com/jackc/pgx/v4/pgxpool"
@@ -33,8 +35,6 @@ func db(ctx context.Context) (*pgxpool.Pool, error) {
3335
}
3436

3537
func TestTransaction_WithRealDB(t *testing.T) {
36-
t.Parallel()
37-
3838
ctx := context.Background()
3939

4040
pool, err := db(ctx)
@@ -52,3 +52,54 @@ func TestTransaction_WithRealDB(t *testing.T) {
5252
require.ErrorIs(t, tr.Commit(ctx), pgx.ErrTxClosed)
5353
require.ErrorIs(t, tr.Rollback(ctx), pgx.ErrTxClosed)
5454
}
55+
56+
// transaction should release all resources if context is cancelled
57+
// otherwise pool.Close() is blocked forever
58+
func TestTransaction_WithRealDB_RollbackOnContextCancel(t *testing.T) {
59+
ctx := context.Background()
60+
61+
pool, err := db(ctx)
62+
require.NoError(t, err)
63+
64+
defer func() {
65+
waitPoolIsClosed(t, pool)
66+
}()
67+
68+
f := pgxv4.NewDefaultFactory(pool)
69+
70+
ctx, cancel := context.WithCancel(ctx)
71+
72+
_, tr, err := f(ctx, settings.Must())
73+
require.NoError(t, err)
74+
75+
require.True(t, tr.IsActive())
76+
77+
cancel()
78+
}
79+
80+
func waitPoolIsClosed(t *testing.T, pool *pgxpool.Pool) {
81+
const checkTick = 50 * time.Millisecond
82+
const waitDurationDeadline = 30 * time.Second
83+
84+
var poolClosed atomic.Bool
85+
poolClosed.Store(false)
86+
87+
go func() {
88+
pool.Close()
89+
poolClosed.Store(true)
90+
}()
91+
92+
require.Eventually(
93+
t,
94+
func() bool {
95+
return poolClosed.Load()
96+
},
97+
waitDurationDeadline,
98+
checkTick)
99+
100+
// https://github.com/jackc/pgx/issues/1641
101+
// pool triggerHealthCheck leaves stranded goroutines for 500ms
102+
// otherwise goleak error is triggered
103+
const waitPoolHealthCheck = 500 * time.Millisecond
104+
time.Sleep(waitPoolHealthCheck)
105+
}

drivers/pgxv5/transaction.go

+17-9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pgxv5
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/jackc/pgx/v5"
78

@@ -11,10 +12,19 @@ import (
1112

1213
// Transaction is trm.Transaction for pgx.Tx.
1314
type Transaction struct {
15+
mu sync.Mutex
1416
tx pgx.Tx
1517
isClosed *drivers.IsClosed
1618
}
1719

20+
func newDefaultTransaction(tx pgx.Tx) *Transaction {
21+
return &Transaction{
22+
mu: sync.Mutex{},
23+
tx: tx,
24+
isClosed: drivers.NewIsClosed(),
25+
}
26+
}
27+
1828
// NewTransaction creates trm.Transaction for pgx.Tx.
1929
func NewTransaction(
2030
ctx context.Context,
@@ -26,10 +36,7 @@ func NewTransaction(
2636
return ctx, nil, err
2737
}
2838

29-
tr := &Transaction{
30-
tx: tx,
31-
isClosed: drivers.NewIsClosed(),
32-
}
39+
tr := newDefaultTransaction(tx)
3340

3441
go tr.awaitDone(ctx)
3542

@@ -43,7 +50,7 @@ func (t *Transaction) awaitDone(ctx context.Context) {
4350

4451
select {
4552
case <-ctx.Done():
46-
t.isClosed.Close()
53+
_ = t.Rollback(ctx)
4754
case <-t.isClosed.Closed():
4855
}
4956
}
@@ -60,23 +67,24 @@ func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Contex
6067
return ctx, nil, err
6168
}
6269

63-
tr := &Transaction{
64-
tx: tx,
65-
isClosed: drivers.NewIsClosed(),
66-
}
70+
tr := newDefaultTransaction(tx)
6771

6872
return ctx, tr, nil
6973
}
7074

7175
// Commit the trm.Transaction.
7276
func (t *Transaction) Commit(ctx context.Context) error {
77+
t.mu.Lock()
78+
defer t.mu.Unlock()
7379
defer t.isClosed.Close()
7480

7581
return t.tx.Commit(ctx)
7682
}
7783

7884
// Rollback the trm.Transaction.
7985
func (t *Transaction) Rollback(ctx context.Context) error {
86+
t.mu.Lock()
87+
defer t.mu.Unlock()
8088
defer t.isClosed.Close()
8189

8290
return t.tx.Rollback(ctx)

drivers/pgxv5/transaction_integration_test.go

+53-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ package pgxv5_test
66
import (
77
"context"
88
"fmt"
9+
"sync/atomic"
910
"testing"
11+
"time"
1012

1113
"github.com/jackc/pgx/v5"
1214
"github.com/jackc/pgx/v5/pgxpool"
@@ -33,8 +35,6 @@ func db(ctx context.Context) (*pgxpool.Pool, error) {
3335
}
3436

3537
func TestTransaction_WithRealDB(t *testing.T) {
36-
t.Parallel()
37-
3838
ctx := context.Background()
3939

4040
pool, err := db(ctx)
@@ -52,3 +52,54 @@ func TestTransaction_WithRealDB(t *testing.T) {
5252
require.ErrorIs(t, tr.Commit(ctx), pgx.ErrTxClosed)
5353
require.ErrorIs(t, tr.Rollback(ctx), pgx.ErrTxClosed)
5454
}
55+
56+
// transaction should release all resources if context is cancelled
57+
// otherwise pool.Close() is blocked forever
58+
func TestTransaction_WithRealDB_RollbackOnContextCancel(t *testing.T) {
59+
ctx := context.Background()
60+
61+
pool, err := db(ctx)
62+
require.NoError(t, err)
63+
64+
defer func() {
65+
waitPoolIsClosed(t, pool)
66+
}()
67+
68+
f := pgxv5.NewDefaultFactory(pool)
69+
70+
ctx, cancel := context.WithCancel(ctx)
71+
72+
_, tr, err := f(ctx, settings.Must())
73+
require.NoError(t, err)
74+
75+
require.True(t, tr.IsActive())
76+
77+
cancel()
78+
}
79+
80+
func waitPoolIsClosed(t *testing.T, pool *pgxpool.Pool) {
81+
const checkTick = 50 * time.Millisecond
82+
const waitDurationDeadline = 30 * time.Second
83+
84+
var poolClosed atomic.Bool
85+
poolClosed.Store(false)
86+
87+
go func() {
88+
pool.Close()
89+
poolClosed.Store(true)
90+
}()
91+
92+
require.Eventually(
93+
t,
94+
func() bool {
95+
return poolClosed.Load()
96+
},
97+
waitDurationDeadline,
98+
checkTick)
99+
100+
// https://github.com/jackc/pgx/issues/1641
101+
// pool triggerHealthCheck leaves stranded goroutines for 500ms
102+
// otherwise goleak error is triggered
103+
const waitPoolHealthCheck = 500 * time.Millisecond
104+
time.Sleep(waitPoolHealthCheck)
105+
}

0 commit comments

Comments
 (0)