Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持用户设定 ping 方法 #6

Merged
merged 2 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# Folders
_obj
_test
.idea

# Architecture specific extensions/prefixes
*.[568vq]
Expand Down
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ Golang 实现的连接池
## 功能:

- 连接池中连接类型为`interface{}`,使得更加通用
- 链接的最大空闲时间,超时的链接将关闭丢弃,可避免空闲时链接自动失效问题
- 使用channel处理池中的链接,高效
- 连接的最大空闲时间,超时的连接将关闭丢弃,可避免空闲时连接自动失效问题
- 支持用户设定 ping 方法,检查连接的连通性,无效的连接将丢弃
- 使用channel处理池中的连接,高效

## 基本用法

Expand All @@ -17,36 +18,40 @@ Golang 实现的连接池
//factory 创建连接的方法
factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:4000") }

//close 关闭链接的方法
//close 关闭连接的方法
close := func(v interface{}) error { return v.(net.Conn).Close() }

//创建一个连接池: 初始化5,最大链接30
//ping 检测连接的方法
//ping := func(v interface{}) error { return nil }

//创建一个连接池: 初始化5,最大连接30
poolConfig := &pool.PoolConfig{
InitialCap: 5,
MaxCap: 30,
Factory: factory,
Close: close,
//链接最大空闲时间,超过该时间的链接 将会关闭,可避免空闲时链接EOF,自动失效的问题
//Ping: ping,
//连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("err=", err)
}

//从连接池中取得一个链接
//从连接池中取得一个连接
v, err := p.Get()

//do something
//conn=v.(net.Conn)

//将链接放回连接池中
//将连接放回连接池中
p.Put(v)

//释放连接池中的所有链接
//释放连接池中的所有连接
p.Release()

//查看当前链接中的数量
//查看当前连接中的数量
current := p.Len()


Expand Down
44 changes: 36 additions & 8 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,29 @@ import (
"time"
)

//PoolConfig 连接池相关配置
// PoolConfig 连接池相关配置
type PoolConfig struct {
//连接池中拥有的最小连接数
InitialCap int
//连接池中拥有的最大的连接数
MaxCap int
//生成连接的方法
Factory func() (interface{}, error)
//关闭链接的方法
//关闭连接的方法
Close func(interface{}) error
//链接最大空闲时间,超过该事件则将失效
//检查连接是否有效的方法
Ping func(interface{}) error
//连接最大空闲时间,超过该事件则将失效
IdleTimeout time.Duration
}

//channelPool 存放链接信息
//channelPool 存放连接信息
type channelPool struct {
mu sync.Mutex
conns chan *idleConn
factory func() (interface{}, error)
close func(interface{}) error
ping func(interface{}) error
idleTimeout time.Duration
}

Expand All @@ -35,11 +38,17 @@ type idleConn struct {
t time.Time
}

//NewChannelPool 初始化链接
//NewChannelPool 初始化连接
func NewChannelPool(poolConfig *PoolConfig) (Pool, error) {
if poolConfig.InitialCap < 0 || poolConfig.MaxCap <= 0 || poolConfig.InitialCap > poolConfig.MaxCap {
return nil, errors.New("invalid capacity settings")
}
if poolConfig.Factory == nil {
return nil, errors.New("invalid factory func settings")
}
if poolConfig.Close == nil {
return nil, errors.New("invalid close func settings")
}

c := &channelPool{
conns: make(chan *idleConn, poolConfig.MaxCap),
Expand All @@ -48,6 +57,10 @@ func NewChannelPool(poolConfig *PoolConfig) (Pool, error) {
idleTimeout: poolConfig.IdleTimeout,
}

if poolConfig.Ping != nil {
c.ping = poolConfig.Ping
}

for i := 0; i < poolConfig.InitialCap; i++ {
conn, err := c.factory()
if err != nil {
Expand Down Expand Up @@ -83,11 +96,18 @@ func (c *channelPool) Get() (interface{}, error) {
//判断是否超时,超时则丢弃
if timeout := c.idleTimeout; timeout > 0 {
if wrapConn.t.Add(timeout).Before(time.Now()) {
//丢弃并关闭该链接
//丢弃并关闭该连接
c.Close(wrapConn.conn)
continue
}
}
//判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查
if c.ping != nil {
if err := c.Ping(wrapConn.conn); err != nil {
fmt.Println("conn is not able to be connected: ", err)
continue
}
}
return wrapConn.conn, nil
default:
conn, err := c.factory()
Expand Down Expand Up @@ -117,7 +137,7 @@ func (c *channelPool) Put(conn interface{}) error {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
return nil
default:
//连接池已满,直接关闭该链接
//连接池已满,直接关闭该连接
return c.Close(conn)
}
}
Expand All @@ -130,7 +150,15 @@ func (c *channelPool) Close(conn interface{}) error {
return c.close(conn)
}

//Release 释放连接池中所有链接
//Ping 检查单条连接是否有效
func (c *channelPool) Ping(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
return c.ping(conn)
}

//Release 释放连接池中所有连接
func (c *channelPool) Release() {
c.mu.Lock()
conns := c.conns
Expand Down
14 changes: 7 additions & 7 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,36 @@ func main() {
//factory 创建连接的方法
factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:80") }

//close 关闭链接的方法
//close 关闭连接的方法
close := func(v interface{}) error { return v.(net.Conn).Close() }

//创建一个连接池: 初始化5,最大链接30
//创建一个连接池: 初始化5,最大连接30
poolConfig := &pool.PoolConfig{
InitialCap: 5,
MaxCap: 30,
Factory: factory,
Close: close,
//链接最大空闲时间,超过该时间的链接 将会关闭,可避免空闲时链接EOF,自动失效的问题
//连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("err=", err)
}

//从连接池中取得一个链接
//从连接池中取得一个连接
v, err := p.Get()

//do something
//conn=v.(net.Conn)

//将链接放回连接池中
//将连接放回连接池中
p.Put(v)

//释放连接池中的所有链接
//释放连接池中的所有连接
//p.Release()

//查看当前链接中的数量
//查看当前连接中的数量
current := p.Len()
fmt.Println("len=", current)
}