Skip to content

Commit

Permalink
Merge pull request #6 from hopehook/master
Browse files Browse the repository at this point in the history
支持用户设定 ping 方法
  • Loading branch information
silenceper authored Sep 14, 2018
2 parents e6b389d + 103fea6 commit b254184
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# Folders
_obj
_test
.idea

# Architecture specific extensions/prefixes
*.[568vq]
Expand Down
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ Golang 实现的连接池
## 功能:

- 连接池中连接类型为`interface{}`,使得更加通用
- 链接的最大空闲时间,超时的链接将关闭丢弃,可避免空闲时链接自动失效问题
- 使用channel处理池中的链接,高效
- 连接的最大空闲时间,超时的连接将关闭丢弃,可避免空闲时连接自动失效问题
- 支持用户设定 ping 方法,检查连接的连通性,无效的连接将丢弃
- 使用channel处理池中的连接,高效

## 基本用法

Expand All @@ -17,36 +18,40 @@ Golang 实现的连接池
//factory 创建连接的方法
factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:4000") }

//close 关闭链接的方法
//close 关闭连接的方法
close := func(v interface{}) error { return v.(net.Conn).Close() }

//创建一个连接池: 初始化5,最大链接30
//ping 检测连接的方法
//ping := func(v interface{}) error { return nil }

//创建一个连接池: 初始化5,最大连接30
poolConfig := &pool.PoolConfig{
InitialCap: 5,
MaxCap: 30,
Factory: factory,
Close: close,
//链接最大空闲时间,超过该时间的链接 将会关闭,可避免空闲时链接EOF,自动失效的问题
//Ping: ping,
//连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("err=", err)
}

//从连接池中取得一个链接
//从连接池中取得一个连接
v, err := p.Get()

//do something
//conn=v.(net.Conn)

//将链接放回连接池中
//将连接放回连接池中
p.Put(v)

//释放连接池中的所有链接
//释放连接池中的所有连接
p.Release()

//查看当前链接中的数量
//查看当前连接中的数量
current := p.Len()


Expand Down
44 changes: 36 additions & 8 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,29 @@ import (
"time"
)

//PoolConfig 连接池相关配置
// PoolConfig 连接池相关配置
type PoolConfig struct {
//连接池中拥有的最小连接数
InitialCap int
//连接池中拥有的最大的连接数
MaxCap int
//生成连接的方法
Factory func() (interface{}, error)
//关闭链接的方法
//关闭连接的方法
Close func(interface{}) error
//链接最大空闲时间,超过该事件则将失效
//检查连接是否有效的方法
Ping func(interface{}) error
//连接最大空闲时间,超过该事件则将失效
IdleTimeout time.Duration
}

//channelPool 存放链接信息
//channelPool 存放连接信息
type channelPool struct {
mu sync.Mutex
conns chan *idleConn
factory func() (interface{}, error)
close func(interface{}) error
ping func(interface{}) error
idleTimeout time.Duration
}

Expand All @@ -35,11 +38,17 @@ type idleConn struct {
t time.Time
}

//NewChannelPool 初始化链接
//NewChannelPool 初始化连接
func NewChannelPool(poolConfig *PoolConfig) (Pool, error) {
if poolConfig.InitialCap < 0 || poolConfig.MaxCap <= 0 || poolConfig.InitialCap > poolConfig.MaxCap {
return nil, errors.New("invalid capacity settings")
}
if poolConfig.Factory == nil {
return nil, errors.New("invalid factory func settings")
}
if poolConfig.Close == nil {
return nil, errors.New("invalid close func settings")
}

c := &channelPool{
conns: make(chan *idleConn, poolConfig.MaxCap),
Expand All @@ -48,6 +57,10 @@ func NewChannelPool(poolConfig *PoolConfig) (Pool, error) {
idleTimeout: poolConfig.IdleTimeout,
}

if poolConfig.Ping != nil {
c.ping = poolConfig.Ping
}

for i := 0; i < poolConfig.InitialCap; i++ {
conn, err := c.factory()
if err != nil {
Expand Down Expand Up @@ -83,11 +96,18 @@ func (c *channelPool) Get() (interface{}, error) {
//判断是否超时,超时则丢弃
if timeout := c.idleTimeout; timeout > 0 {
if wrapConn.t.Add(timeout).Before(time.Now()) {
//丢弃并关闭该链接
//丢弃并关闭该连接
c.Close(wrapConn.conn)
continue
}
}
//判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查
if c.ping != nil {
if err := c.Ping(wrapConn.conn); err != nil {
fmt.Println("conn is not able to be connected: ", err)
continue
}
}
return wrapConn.conn, nil
default:
conn, err := c.factory()
Expand Down Expand Up @@ -117,7 +137,7 @@ func (c *channelPool) Put(conn interface{}) error {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
return nil
default:
//连接池已满,直接关闭该链接
//连接池已满,直接关闭该连接
return c.Close(conn)
}
}
Expand All @@ -130,7 +150,15 @@ func (c *channelPool) Close(conn interface{}) error {
return c.close(conn)
}

//Release 释放连接池中所有链接
//Ping 检查单条连接是否有效
func (c *channelPool) Ping(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
return c.ping(conn)
}

//Release 释放连接池中所有连接
func (c *channelPool) Release() {
c.mu.Lock()
conns := c.conns
Expand Down
14 changes: 7 additions & 7 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,36 @@ func main() {
//factory 创建连接的方法
factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:80") }

//close 关闭链接的方法
//close 关闭连接的方法
close := func(v interface{}) error { return v.(net.Conn).Close() }

//创建一个连接池: 初始化5,最大链接30
//创建一个连接池: 初始化5,最大连接30
poolConfig := &pool.PoolConfig{
InitialCap: 5,
MaxCap: 30,
Factory: factory,
Close: close,
//链接最大空闲时间,超过该时间的链接 将会关闭,可避免空闲时链接EOF,自动失效的问题
//连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("err=", err)
}

//从连接池中取得一个链接
//从连接池中取得一个连接
v, err := p.Get()

//do something
//conn=v.(net.Conn)

//将链接放回连接池中
//将连接放回连接池中
p.Put(v)

//释放连接池中的所有链接
//释放连接池中的所有连接
//p.Release()

//查看当前链接中的数量
//查看当前连接中的数量
current := p.Len()
fmt.Println("len=", current)
}

0 comments on commit b254184

Please sign in to comment.