diff --git a/base.go b/base.go index 87de73a4..5c43a547 100644 --- a/base.go +++ b/base.go @@ -227,8 +227,8 @@ func (db *baseDB) ExecContext(c context.Context, query interface{}, params ...in } func (db *baseDB) exec(ctx context.Context, query interface{}, params ...interface{}) (Result, error) { - wb := pool.GetWriteBuffer() - defer pool.PutWriteBuffer(wb) + wb := db.pool.GetWriteBuffer() + defer db.pool.PutWriteBuffer(wb) if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil { return nil, err @@ -297,8 +297,8 @@ func (db *baseDB) QueryContext(c context.Context, model, query interface{}, para } func (db *baseDB) query(ctx context.Context, model, query interface{}, params ...interface{}) (Result, error) { - wb := pool.GetWriteBuffer() - defer pool.PutWriteBuffer(wb) + wb := db.pool.GetWriteBuffer() + defer db.pool.PutWriteBuffer(wb) if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil { return nil, err @@ -374,8 +374,8 @@ func (db *baseDB) copyFrom( ) (res Result, err error) { var evt *QueryEvent - wb := pool.GetWriteBuffer() - defer pool.PutWriteBuffer(wb) + wb := db.pool.GetWriteBuffer() + defer db.pool.PutWriteBuffer(wb) if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil { return nil, err @@ -456,8 +456,8 @@ func (db *baseDB) copyTo( ) (res Result, err error) { var evt *QueryEvent - wb := pool.GetWriteBuffer() - defer pool.PutWriteBuffer(wb) + wb := db.pool.GetWriteBuffer() + defer db.pool.PutWriteBuffer(wb) if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil { return nil, err diff --git a/base_test.go b/base_test.go index e729cbef..0bba506c 100644 --- a/base_test.go +++ b/base_test.go @@ -12,7 +12,7 @@ import ( ) /* - The test is for testing the case that sending a cancel request when the timeout from connection comes earlier than ctx.Done(). +The test is for testing the case that sending a cancel request when the timeout from connection comes earlier than ctx.Done(). */ func Test_baseDB_withConn(t *testing.T) { b := mockBaseDB{} @@ -44,9 +44,10 @@ type mockPooler struct { } func (m *mockPooler) NewConn(ctx context.Context) (*pool.Conn, error) { - m.conn = &pool.Conn{ProcessID: 123, SecretKey: 234, Inited: true} m.mockConn = mockConn{} - m.conn.SetNetConn(&m.mockConn) + m.conn = pool.NewConn(&m.mockConn, pool.NewConnPool(&pool.Options{})) + m.conn.ProcessID = 123 + m.conn.SecretKey = 234 return m.conn, nil } @@ -83,6 +84,20 @@ func (m *mockPooler) Close() error { return nil } +func (m *mockPooler) GetWriteBuffer() *pool.WriteBuffer { + return pool.NewWriteBuffer(1024) +} + +func (m *mockPooler) PutWriteBuffer(_ *pool.WriteBuffer) { +} + +func (m *mockPooler) GetReaderContext() *pool.ReaderContext { + return pool.NewReaderContext(1024) +} + +func (m *mockPooler) PutReaderContext(_ *pool.ReaderContext) { +} + type mockPGError struct { M map[byte]string } diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 91045245..543ff4d7 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -13,6 +13,7 @@ var noDeadline = time.Time{} type Conn struct { netConn net.Conn rd *ReaderContext + pool *ConnPool ProcessID int32 SecretKey int32 @@ -24,9 +25,10 @@ type Conn struct { Inited bool } -func NewConn(netConn net.Conn) *Conn { +func NewConn(netConn net.Conn, pool *ConnPool) *Conn { cn := &Conn{ createdAt: time.Now(), + pool: pool, } cn.SetNetConn(netConn) cn.SetUsedAt(time.Now()) @@ -57,7 +59,7 @@ func (cn *Conn) LockReader() { if cn.rd != nil { panic("not reached") } - cn.rd = NewReaderContext() + cn.rd = NewReaderContext(cn.pool.opt.ReadBufferInitialSize) cn.rd.Reset(cn.netConn) } @@ -79,8 +81,8 @@ func (cn *Conn) WithReader( rd := cn.rd if rd == nil { - rd = GetReaderContext() - defer PutReaderContext(rd) + rd = cn.pool.GetReaderContext() + defer cn.pool.PutReaderContext(rd) rd.Reset(cn.netConn) } @@ -97,8 +99,8 @@ func (cn *Conn) WithReader( func (cn *Conn) WithWriter( ctx context.Context, timeout time.Duration, fn func(wb *WriteBuffer) error, ) error { - wb := GetWriteBuffer() - defer PutWriteBuffer(wb) + wb := cn.pool.GetWriteBuffer() + defer cn.pool.PutWriteBuffer(wb) if err := fn(wb); err != nil { return err diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 59f2c72d..1001436a 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -42,6 +42,10 @@ type Pooler interface { Get(context.Context) (*Conn, error) Put(context.Context, *Conn) Remove(context.Context, *Conn, error) + GetWriteBuffer() *WriteBuffer + PutWriteBuffer(*WriteBuffer) + GetReaderContext() *ReaderContext + PutReaderContext(*ReaderContext) Len() int IdleLen() int @@ -54,12 +58,14 @@ type Options struct { Dialer func(context.Context) (net.Conn, error) OnClose func(*Conn) error - PoolSize int - MinIdleConns int - MaxConnAge time.Duration - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration + PoolSize int + MinIdleConns int + ReadBufferInitialSize int + WriteBufferInitialSize int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration } type ConnPool struct { @@ -82,6 +88,9 @@ type ConnPool struct { poolSize int idleConnsLen int + + wbPool sync.Pool + rbPool sync.Pool } var _ Pooler = (*ConnPool)(nil) @@ -93,6 +102,16 @@ func NewConnPool(opt *Options) *ConnPool { queue: make(chan struct{}, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize), idleConns: make([]*Conn, 0, opt.PoolSize), + wbPool: sync.Pool{ + New: func() interface{} { + return NewWriteBuffer(opt.WriteBufferInitialSize) + }, + }, + rbPool: sync.Pool{ + New: func() interface{} { + return NewReaderContext(opt.ReadBufferInitialSize) + }, + }, } p.connsMu.Lock() @@ -182,7 +201,7 @@ func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) { return nil, err } - cn := NewConn(netConn) + cn := NewConn(netConn, p) cn.pooled = pooled return cn, nil } @@ -504,3 +523,23 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool { return false } + +func (p *ConnPool) GetWriteBuffer() *WriteBuffer { + wb := p.wbPool.Get().(*WriteBuffer) + return wb +} + +func (p *ConnPool) PutWriteBuffer(wb *WriteBuffer) { + wb.Reset() + p.wbPool.Put(wb) +} + +func (p *ConnPool) GetReaderContext() *ReaderContext { + rd := p.rbPool.Get().(*ReaderContext) + return rd +} + +func (p *ConnPool) PutReaderContext(rd *ReaderContext) { + rd.ColumnAlloc.Reset() + p.rbPool.Put(rd) +} diff --git a/internal/pool/pool_single.go b/internal/pool/pool_single.go index 6c0cdffa..193c4cfd 100644 --- a/internal/pool/pool_single.go +++ b/internal/pool/pool_single.go @@ -61,3 +61,19 @@ func (p *SingleConnPool) IdleLen() int { func (p *SingleConnPool) Stats() *Stats { return &Stats{} } + +func (p *SingleConnPool) GetWriteBuffer() *WriteBuffer { + return p.pool.GetWriteBuffer() +} + +func (p *SingleConnPool) PutWriteBuffer(wb *WriteBuffer) { + p.pool.PutWriteBuffer(wb) +} + +func (p *SingleConnPool) GetReaderContext() *ReaderContext { + return p.pool.GetReaderContext() +} + +func (p *SingleConnPool) PutReaderContext(rd *ReaderContext) { + p.pool.PutReaderContext(rd) +} diff --git a/internal/pool/pool_sticky.go b/internal/pool/pool_sticky.go index 0415b5e8..5448af80 100644 --- a/internal/pool/pool_sticky.go +++ b/internal/pool/pool_sticky.go @@ -200,3 +200,19 @@ func (p *StickyConnPool) IdleLen() int { func (p *StickyConnPool) Stats() *Stats { return &Stats{} } + +func (p *StickyConnPool) GetWriteBuffer() *WriteBuffer { + return p.pool.GetWriteBuffer() +} + +func (p *StickyConnPool) PutWriteBuffer(wb *WriteBuffer) { + p.pool.PutWriteBuffer(wb) +} + +func (p *StickyConnPool) GetReaderContext() *ReaderContext { + return p.pool.GetReaderContext() +} + +func (p *StickyConnPool) PutReaderContext(rd *ReaderContext) { + p.pool.PutReaderContext(rd) +} diff --git a/internal/pool/reader.go b/internal/pool/reader.go index b5d00807..2d302da8 100644 --- a/internal/pool/reader.go +++ b/internal/pool/reader.go @@ -1,9 +1,5 @@ package pool -import ( - "sync" -) - type Reader interface { Buffered() int @@ -55,26 +51,9 @@ type ReaderContext struct { ColumnAlloc *ColumnAlloc } -func NewReaderContext() *ReaderContext { - const bufSize = 1 << 20 // 1mb +func NewReaderContext(bufSize int) *ReaderContext { return &ReaderContext{ BufReader: NewBufReader(bufSize), ColumnAlloc: NewColumnAlloc(), } } - -var readerPool = sync.Pool{ - New: func() interface{} { - return NewReaderContext() - }, -} - -func GetReaderContext() *ReaderContext { - rd := readerPool.Get().(*ReaderContext) - return rd -} - -func PutReaderContext(rd *ReaderContext) { - rd.ColumnAlloc.Reset() - readerPool.Put(rd) -} diff --git a/internal/pool/write_buffer.go b/internal/pool/write_buffer.go index 6981d3f4..367bddaa 100644 --- a/internal/pool/write_buffer.go +++ b/internal/pool/write_buffer.go @@ -3,27 +3,8 @@ package pool import ( "encoding/binary" "io" - "sync" ) -const defaultBufSize = 65 << 10 // 65kb - -var wbPool = sync.Pool{ - New: func() interface{} { - return NewWriteBuffer() - }, -} - -func GetWriteBuffer() *WriteBuffer { - wb := wbPool.Get().(*WriteBuffer) - return wb -} - -func PutWriteBuffer(wb *WriteBuffer) { - wb.Reset() - wbPool.Put(wb) -} - type WriteBuffer struct { Bytes []byte @@ -31,9 +12,9 @@ type WriteBuffer struct { paramStart int } -func NewWriteBuffer() *WriteBuffer { +func NewWriteBuffer(bufSize int) *WriteBuffer { return &WriteBuffer{ - Bytes: make([]byte, 0, defaultBufSize), + Bytes: make([]byte, 0, bufSize), } } diff --git a/options.go b/options.go index 60b1daa8..8355a289 100644 --- a/options.go +++ b/options.go @@ -90,6 +90,14 @@ type Options struct { // but idle connections are still discarded by the client // if IdleTimeout is set. IdleCheckFrequency time.Duration + // Connections read buffers stored in a sync.Pool to reduce allocations. + // Using this option you can adjust the initial size of the buffer. + // Default is 1 Mb. + ReadBufferInitialSize int + // Connections write buffers stored in a sync.Pool to reduce allocations. + // Using this option you can adjust the initial size of the buffer. + // Default is 64 Kb. + WriteBufferInitialSize int } func (opt *Options) init() { @@ -164,6 +172,14 @@ func (opt *Options) init() { case 0: opt.MaxRetryBackoff = 4 * time.Second } + + if opt.ReadBufferInitialSize == 0 { + opt.ReadBufferInitialSize = 1048576 // 1Mb + } + + if opt.WriteBufferInitialSize == 0 { + opt.WriteBufferInitialSize = 65536 // 64Kb + } } func env(key, defValue string) string { @@ -318,11 +334,13 @@ func newConnPool(opt *Options) *pool.ConnPool { Dialer: opt.getDialer(), OnClose: terminateConn, - PoolSize: opt.PoolSize, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: opt.IdleCheckFrequency, + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: opt.IdleCheckFrequency, + ReadBufferInitialSize: opt.ReadBufferInitialSize, + WriteBufferInitialSize: opt.WriteBufferInitialSize, }) } diff --git a/tx.go b/tx.go index db444ff6..3162519a 100644 --- a/tx.go +++ b/tx.go @@ -150,8 +150,8 @@ func (tx *Tx) ExecContext(c context.Context, query interface{}, params ...interf } func (tx *Tx) exec(ctx context.Context, query interface{}, params ...interface{}) (Result, error) { - wb := pool.GetWriteBuffer() - defer pool.PutWriteBuffer(wb) + wb := tx.db.pool.GetWriteBuffer() + defer tx.db.pool.PutWriteBuffer(wb) if err := writeQueryMsg(wb, tx.db.fmter, query, params...); err != nil { return nil, err @@ -217,8 +217,8 @@ func (tx *Tx) query( query interface{}, params ...interface{}, ) (Result, error) { - wb := pool.GetWriteBuffer() - defer pool.PutWriteBuffer(wb) + wb := tx.db.pool.GetWriteBuffer() + defer tx.db.pool.PutWriteBuffer(wb) if err := writeQueryMsg(wb, tx.db.fmter, query, params...); err != nil { return nil, err