From d8593815906100d22c7b333bcc7d86d451bcaf5f Mon Sep 17 00:00:00 2001 From: Liwen Fu Date: Thu, 9 Mar 2023 15:26:47 +0800 Subject: [PATCH] session pool supports multiple nodes (#78) Co-authored-by: fuliwen --- README.md | 13 +++++++ README_ZH.md | 15 ++++++++ client/session.go | 36 +++++++++++--------- client/sessionpool.go | 31 +++++++++++++---- example/session_pool/session_pool_example.go | 35 ++++++++++++------- 5 files changed, 94 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index cc2a26e..831c60b 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ If there is no available connections and the pool reaches its max size, the all The PutBack method must be called after use ### New sessionPool +standalone ```golang @@ -97,6 +98,18 @@ config := &client.PoolConfig{ } sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) +``` +cluster or doubleLive + +```golang + +config := &client.PoolConfig{ + UserName: user, + Password: password, + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), + } +sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + ``` ### Get session through sessionPool, putback after use diff --git a/README_ZH.md b/README_ZH.md index 13d4ff8..3e88438 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -69,6 +69,7 @@ go run session_example.go ### 创建sessionPool +单实例 ```golang config := &client.PoolConfig{ @@ -81,6 +82,20 @@ sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) ``` +分布式或双活 + +```golang + +config := &client.PoolConfig{ + UserName: user, + Password: password, + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), + } +sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + +``` + + ### 使用sessionPool获取session,使用完手动调用PutBack 例1:设置存储组 diff --git a/client/session.go b/client/session.go index c2a29e7..f26699a 100644 --- a/client/session.go +++ b/client/session.go @@ -118,11 +118,12 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err } type ClusterConfig struct { - NodeUrls []string //ip:port - UserName string - Password string - FetchSize int32 - TimeZone string + NodeUrls []string //ip:port + UserName string + Password string + FetchSize int32 + TimeZone string + ConnectRetryMax int } type ClusterSession struct { @@ -975,12 +976,12 @@ func NewSession(config *Config) Session { return Session{config: config} } -func NewClusterSession(ClusterConfig *ClusterConfig) Session { +func NewClusterSession(clusterConfig *ClusterConfig) Session { session := Session{} node := endPoint{} - for i := 0; i < len(ClusterConfig.NodeUrls); i++ { - node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0] - node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1] + for i := 0; i < len(clusterConfig.NodeUrls); i++ { + node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0] + node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1] endPointList.PushBack(node) } var err error @@ -996,7 +997,7 @@ func NewClusterSession(ClusterConfig *ClusterConfig) Session { log.Println(err) } else { session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port, - ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone) + clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax) break } } @@ -1052,14 +1053,15 @@ func (s *Session) initClusterConn(node endPoint) error { } -func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config { +func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int) *Config { return &Config{ - Host: host, - Port: port, - UserName: userName, - Password: passWord, - FetchSize: fetchSize, - TimeZone: timeZone, + Host: host, + Port: port, + UserName: userName, + Password: passWord, + FetchSize: fetchSize, + TimeZone: timeZone, + ConnectRetryMax: connectRetryMax, } } diff --git a/client/sessionpool.go b/client/sessionpool.go index dbcb7bb..156ce2a 100644 --- a/client/sessionpool.go +++ b/client/sessionpool.go @@ -78,7 +78,7 @@ func (spool *SessionPool) GetSession() (session Session, err error) { if ok { return session, nil } else { - log.Println("sessionpool has closed") + log.Println("sessionPool has closed") return session, errPoolClosed } default: @@ -93,11 +93,19 @@ func (spool *SessionPool) GetSession() (session Session, err error) { } } -func (spool *SessionPool) ConstructSession(config *PoolConfig) (Session, error) { - session := NewSession(getSessionConfig(config)) - if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil { - log.Print(err) - return session, err +func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) { + if len(config.NodeUrls) > 0 { + session = NewClusterSession(getClusterSessionConfig(config)) + if err := session.OpenCluster(spool.enableCompression); err != nil { + log.Print(err) + return session, err + } + } else { + session = NewSession(getSessionConfig(config)) + if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil { + log.Print(err) + return session, err + } } return session, nil } @@ -114,6 +122,17 @@ func getSessionConfig(config *PoolConfig) *Config { } } +func getClusterSessionConfig(config *PoolConfig) *ClusterConfig { + return &ClusterConfig{ + NodeUrls: config.NodeUrls, + UserName: config.UserName, + Password: config.Password, + FetchSize: config.FetchSize, + TimeZone: config.TimeZone, + ConnectRetryMax: config.ConnectRetryMax, + } +} + func (spool *SessionPool) PutBack(session Session) { if session.trans.IsOpen() { spool.ch <- session diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go index 26a7fb7..c43a4fb 100644 --- a/example/session_pool/session_pool_example.go +++ b/example/session_pool/session_pool_example.go @@ -24,6 +24,7 @@ import ( "fmt" "log" "math/rand" + "strings" "sync" "time" @@ -60,8 +61,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - setStorageGroup(fmt.Sprintf("root.ln%d", j)) - deleteStorageGroup(fmt.Sprintf("root.ln%d", j)) + setStorageGroup(fmt.Sprintf("root.ln-%d", j)) + deleteStorageGroup(fmt.Sprintf("root.ln-%d", j)) }() @@ -134,17 +135,6 @@ func main() { insertAlignedTablets() deleteTimeseries("root.ln.device1.*") executeQueryStatement("show timeseries root.**") - for i := 0; i < 10000; i++ { - var j = i - wg.Add(1) - go func() { - defer wg.Done() - setStorageGroup(fmt.Sprintf("root.ln%d", j)) - deleteStorageGroup(fmt.Sprintf("root.ln%d", j)) - - }() - - } wg.Wait() } @@ -773,3 +763,22 @@ func checkError(status *rpc.TSStatus, err error) { } } } + +// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection +func useSessionPool() { + + config := &client.PoolConfig{ + UserName: user, + Password: password, + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + defer sessionPool.Close() + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err != nil { + log.Print(err) + return + } + +}