diff --git a/.gitignore b/.gitignore index daf913b..0c9e329 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # Folders _obj _test +.idea # Architecture specific extensions/prefixes *.[568vq] diff --git a/README.md b/README.md index 39ec8c6..53a92cc 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,9 @@ Golang 实现的连接池 ## 功能: - 连接池中连接类型为`interface{}`,使得更加通用 -- 链接的最大空闲时间,超时的链接将关闭丢弃,可避免空闲时链接自动失效问题 -- 使用channel处理池中的链接,高效 +- 连接的最大空闲时间,超时的连接将关闭丢弃,可避免空闲时连接自动失效问题 +- 支持用户设定 ping 方法,检查连接的连通性,无效的连接将丢弃 +- 使用channel处理池中的连接,高效 ## 基本用法 @@ -17,16 +18,20 @@ Golang 实现的连接池 //factory 创建连接的方法 factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:4000") } -//close 关闭链接的方法 +//close 关闭连接的方法 close := func(v interface{}) error { return v.(net.Conn).Close() } -//创建一个连接池: 初始化5,最大链接30 +//ping 检测连接的方法 +//ping := func(v interface{}) error { return nil } + +//创建一个连接池: 初始化5,最大连接30 poolConfig := &pool.PoolConfig{ InitialCap: 5, MaxCap: 30, Factory: factory, Close: close, - //链接最大空闲时间,超过该时间的链接 将会关闭,可避免空闲时链接EOF,自动失效的问题 + //Ping: ping, + //连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题 IdleTimeout: 15 * time.Second, } p, err := pool.NewChannelPool(poolConfig) @@ -34,19 +39,19 @@ if err != nil { fmt.Println("err=", err) } -//从连接池中取得一个链接 +//从连接池中取得一个连接 v, err := p.Get() //do something //conn=v.(net.Conn) -//将链接放回连接池中 +//将连接放回连接池中 p.Put(v) -//释放连接池中的所有链接 +//释放连接池中的所有连接 p.Release() -//查看当前链接中的数量 +//查看当前连接中的数量 current := p.Len() diff --git a/channel.go b/channel.go index 7e36eaf..e1da7b5 100644 --- a/channel.go +++ b/channel.go @@ -7,7 +7,7 @@ import ( "time" ) -//PoolConfig 连接池相关配置 +// PoolConfig 连接池相关配置 type PoolConfig struct { //连接池中拥有的最小连接数 InitialCap int @@ -15,18 +15,21 @@ type PoolConfig struct { MaxCap int //生成连接的方法 Factory func() (interface{}, error) - //关闭链接的方法 + //关闭连接的方法 Close func(interface{}) error - //链接最大空闲时间,超过该事件则将失效 + //检查连接是否有效的方法 + Ping func(interface{}) error + //连接最大空闲时间,超过该事件则将失效 IdleTimeout time.Duration } -//channelPool 存放链接信息 +//channelPool 存放连接信息 type channelPool struct { mu sync.Mutex conns chan *idleConn factory func() (interface{}, error) close func(interface{}) error + ping func(interface{}) error idleTimeout time.Duration } @@ -35,11 +38,17 @@ type idleConn struct { t time.Time } -//NewChannelPool 初始化链接 +//NewChannelPool 初始化连接 func NewChannelPool(poolConfig *PoolConfig) (Pool, error) { if poolConfig.InitialCap < 0 || poolConfig.MaxCap <= 0 || poolConfig.InitialCap > poolConfig.MaxCap { return nil, errors.New("invalid capacity settings") } + if poolConfig.Factory == nil { + return nil, errors.New("invalid factory func settings") + } + if poolConfig.Close == nil { + return nil, errors.New("invalid close func settings") + } c := &channelPool{ conns: make(chan *idleConn, poolConfig.MaxCap), @@ -48,6 +57,10 @@ func NewChannelPool(poolConfig *PoolConfig) (Pool, error) { idleTimeout: poolConfig.IdleTimeout, } + if poolConfig.Ping != nil { + c.ping = poolConfig.Ping + } + for i := 0; i < poolConfig.InitialCap; i++ { conn, err := c.factory() if err != nil { @@ -83,11 +96,18 @@ func (c *channelPool) Get() (interface{}, error) { //判断是否超时,超时则丢弃 if timeout := c.idleTimeout; timeout > 0 { if wrapConn.t.Add(timeout).Before(time.Now()) { - //丢弃并关闭该链接 + //丢弃并关闭该连接 c.Close(wrapConn.conn) continue } } + //判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查 + if c.ping != nil { + if err := c.Ping(wrapConn.conn); err != nil { + fmt.Println("conn is not able to be connected: ", err) + continue + } + } return wrapConn.conn, nil default: conn, err := c.factory() @@ -117,7 +137,7 @@ func (c *channelPool) Put(conn interface{}) error { case c.conns <- &idleConn{conn: conn, t: time.Now()}: return nil default: - //连接池已满,直接关闭该链接 + //连接池已满,直接关闭该连接 return c.Close(conn) } } @@ -130,7 +150,15 @@ func (c *channelPool) Close(conn interface{}) error { return c.close(conn) } -//Release 释放连接池中所有链接 +//Ping 检查单条连接是否有效 +func (c *channelPool) Ping(conn interface{}) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + return c.ping(conn) +} + +//Release 释放连接池中所有连接 func (c *channelPool) Release() { c.mu.Lock() conns := c.conns diff --git a/example/main.go b/example/main.go index 96c33c0..687653c 100644 --- a/example/main.go +++ b/example/main.go @@ -13,16 +13,16 @@ func main() { //factory 创建连接的方法 factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:80") } - //close 关闭链接的方法 + //close 关闭连接的方法 close := func(v interface{}) error { return v.(net.Conn).Close() } - //创建一个连接池: 初始化5,最大链接30 + //创建一个连接池: 初始化5,最大连接30 poolConfig := &pool.PoolConfig{ InitialCap: 5, MaxCap: 30, Factory: factory, Close: close, - //链接最大空闲时间,超过该时间的链接 将会关闭,可避免空闲时链接EOF,自动失效的问题 + //连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题 IdleTimeout: 15 * time.Second, } p, err := pool.NewChannelPool(poolConfig) @@ -30,19 +30,19 @@ func main() { fmt.Println("err=", err) } - //从连接池中取得一个链接 + //从连接池中取得一个连接 v, err := p.Get() //do something //conn=v.(net.Conn) - //将链接放回连接池中 + //将连接放回连接池中 p.Put(v) - //释放连接池中的所有链接 + //释放连接池中的所有连接 //p.Release() - //查看当前链接中的数量 + //查看当前连接中的数量 current := p.Len() fmt.Println("len=", current) }