From 67ea3f0438255ba96353aa8f79428fbdc3984e97 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Mon, 21 Oct 2019 22:05:17 +0800 Subject: [PATCH] xdb update --- xdb/driver/driver.go | 7 ++- xdb/leveldb/leveldb.go | 22 ++++---- xdb/pool.go | 111 +++++++++++++++++++++++++++++++++++++++++ xdb/pool_test.go | 67 +++++++++++++++++++++++++ xdb/xdb.go | 28 +++++++---- xdb/xdb_test.go | 23 ++++++--- 6 files changed, 230 insertions(+), 28 deletions(-) create mode 100644 xdb/pool.go create mode 100644 xdb/pool_test.go diff --git a/xdb/driver/driver.go b/xdb/driver/driver.go index c5322aba..26a31dfe 100644 --- a/xdb/driver/driver.go +++ b/xdb/driver/driver.go @@ -26,10 +26,15 @@ type XdbDriver interface { GetKeyVal(key []byte) ([]byte,error) PutKeyVal(key []byte,val []byte) error DelKeyVal(key []byte) error - GetListByKeyPrefix(key []byte) ([]string,error) + GetListByKeyPrefix(key []byte) ([]ListValue,error) Close() error } +type ListValue struct { + Key string + Value string +} + func Register(name string, driver Driver,version string) { defer func() { if err := recover();err!=nil{ diff --git a/xdb/leveldb/leveldb.go b/xdb/leveldb/leveldb.go index 69aa9404..4c11bf32 100644 --- a/xdb/leveldb/leveldb.go +++ b/xdb/leveldb/leveldb.go @@ -5,8 +5,8 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" "github.com/brokercap/Bifrost/xdb/driver" "os" - "log" "fmt" + "strings" ) const VERSION = "v1.1.0" @@ -40,9 +40,6 @@ type Conn struct { func (This *Conn) connect() error{ os.MkdirAll(This.path, 0700) This.levelDB, This.err = leveldb.OpenFile(This.path, nil) - if This.err != nil{ - log.Println("sdfsdfsd",This.err) - } return This.err } @@ -53,6 +50,9 @@ func (This *Conn) Close() (error){ func (This *Conn) GetKeyVal(key []byte) ([]byte,error){ s, err := This.levelDB.Get(key, nil) + if err != nil && strings.Contains(err.Error(),"not found"){ + return []byte(""),err + } return s,err } @@ -65,15 +65,15 @@ func (This *Conn) DelKeyVal(key []byte) error{ return This.levelDB.Delete(key,nil) } -func (This *Conn) GetListByKeyPrefix(key []byte) ([]string,error){ - data := make([]string,0) +func (This *Conn) GetListByKeyPrefix(key []byte) ([]driver.ListValue,error){ + data := make([]driver.ListValue,0) iter := This.levelDB.NewIterator(util.BytesPrefix(key), nil) for iter.Next() { - //tmp := make([][]byte,1) - //tmp[0] = iter.Key() - //tmp[1] = iter.Value() - //log.Println("tmp1:",string(iter.Value())) - data = append(data,string(iter.Value())) + data = append(data, + driver.ListValue{ + Key:string(iter.Key()), + Value:string(iter.Value()), + }) } iter.Release() return data,nil diff --git a/xdb/pool.go b/xdb/pool.go new file mode 100644 index 00000000..431104ce --- /dev/null +++ b/xdb/pool.go @@ -0,0 +1,111 @@ +package xdb + +import ( + "sync" + "time" + "fmt" + "runtime/debug" + "log" +) + +type PoolClient struct { + sync.RWMutex + ClientChan chan *Client + Uri string + MaxClientCount uint8 + CurrentClientCount uint8 + AvailableCount uint8 +} + +var clientPool map[string]*PoolClient + +func init() { + clientPool = make(map[string]*PoolClient,0) +} + +func InitClientPool(name string,uri string,count uint8) error{ + if _,ok:=clientPool[name];!ok{ + clientPool[name] = &PoolClient{ + ClientChan: make(chan *Client, int(count)), + Uri:uri, + MaxClientCount: count, + CurrentClientCount: 0, + AvailableCount:0, + } + } + return nil +} + +func GetClient(name string) (c *Client,err error) { + if _,ok:=clientPool[name];!ok{ + return nil,fmt.Errorf(name+" not esxit") + } + t := clientPool[name] + t.Lock() + if t.AvailableCount > 0 { + t.AvailableCount-- + t.Unlock() + //这里为什么不需要timeout,是因为前面加了lock 判断空闲连接数 + c = <- t.ClientChan + return + } + if t.MaxClientCount > t.CurrentClientCount{ + t.CurrentClientCount++ + t.Unlock() + f,stringKey := NewClient(name,t.Uri) + if f == nil{ + t.Lock() + t.CurrentClientCount-- + t.Unlock() + } + return f,stringKey + } + t.Unlock() + timer := time.NewTimer(5 * time.Second) + select { + case c = <-t.ClientChan: + break + case <- timer.C: + break + } + timer.Stop() + if c == nil{ + return nil,fmt.Errorf("get client time out") + } + t.Lock() + t.AvailableCount-- + t.Unlock() + return +} + +func BackCient(name string,c *Client) bool { + defer func() { + if err := recover();err !=nil{ + log.Println(string(debug.Stack())) + return + } + }() + if _,ok:=clientPool[name];!ok{ + return true + } + t:=clientPool[name] + t.Lock() + if t.CurrentClientCount > t.MaxClientCount{ + t.CurrentClientCount-- + func(){ + defer func() { + if err := recover();err != nil{ + log.Println(string(debug.Stack())) + return + } + }() + //调用插件函数,关闭连接,这里防止插件代码写得有问题,抛异常,所以这里需要recover一次 + c.Close() + }() + }else{ + t.AvailableCount++ + t.ClientChan <- c + } + t.Unlock() + return true +} \ No newline at end of file diff --git a/xdb/pool_test.go b/xdb/pool_test.go new file mode 100644 index 00000000..962e44b8 --- /dev/null +++ b/xdb/pool_test.go @@ -0,0 +1,67 @@ +package xdb_test + +import ( + "github.com/brokercap/Bifrost/xdb" + "testing" +) + +func setKeyVal(table,key1 string,value interface{}) error { + client,err := xdb.GetClient("leveldb") + if err != nil{ + return err + } + defer xdb.BackCient("leveldb",client) + client.SetPrefix("xdbtest").PutKeyVal(table,key1,value) + return nil +} + +func getKeyVal(table,key1 string) ([]byte,error) { + client,err := xdb.GetClient("leveldb") + if err != nil{ + return nil,err + } + defer xdb.BackCient("leveldb",client) + c,err := client.GetKeyValBytes(table,key1) + return c,err +} + +func TestPool(t *testing.T) { + xdb.InitClientPool("leveldb","./myleveldir4",1) + + type DataSource struct { + Name string + Uri string + } + + var table,key1,key2 string + var val1,val2 DataSource + + table = "data_source" + + key1 = "tstst1" + val1 = DataSource{Name:"sss",Uri:"URI1"} + err0 := setKeyVal(table,key1,val1) + if err0 != nil{ + t.Fatal(key1, " put error:",err0) + }else{ + t.Log(key1," put success") + } + + key2 = "tstst2" + val2 = DataSource{Name:"sss22",Uri:"URI1222"} + err0 = setKeyVal(table,key2,val2) + + if err0 != nil{ + t.Fatal(key2, " put error:",err0) + }else{ + t.Log(key2," put success") + } + + c,err1 := getKeyVal(table,key1) + t.Log(key1," c:",string(c),"err1:",err1) + + + c,err1 = getKeyVal(table,key2) + t.Log(key2," c:",string(c),"err1:",err1) + +} \ No newline at end of file diff --git a/xdb/xdb.go b/xdb/xdb.go index 53107955..a1cfc149 100644 --- a/xdb/xdb.go +++ b/xdb/xdb.go @@ -9,9 +9,10 @@ import ( _ "github.com/brokercap/Bifrost/xdb/leveldb" ) -const PREFIX = "xdb" +const DEFAULT_PREFIX = "xdb" type Client struct { + prefix string client driver.XdbDriver } @@ -22,11 +23,18 @@ func NewClient(name ,uri string) (*Client,error){ } return &Client{ client:client, + prefix:DEFAULT_PREFIX, },nil } +func (This *Client) SetPrefix(prefix string) *Client{ + This.prefix = prefix + return This +} + + func (This *Client) GetKeyVal(table,key string,data interface{}) ([]byte,error){ - myKey := []byte(PREFIX+"-"+table+"-"+key) + myKey := []byte(This.prefix+"-"+table+"-"+key) s,err := This.client.GetKeyVal(myKey) if err != nil{ return nil,err @@ -39,7 +47,7 @@ func (This *Client) GetKeyVal(table,key string,data interface{}) ([]byte,error){ } func (This *Client) PutKeyVal(table,key string,data interface{}) error{ - myKey := []byte(PREFIX+"-"+table+"-"+key) + myKey := []byte(This.prefix+"-"+table+"-"+key) val,err := json.Marshal(data) if err != nil{ return err @@ -49,25 +57,25 @@ func (This *Client) PutKeyVal(table,key string,data interface{}) error{ } func (This *Client) GetKeyValBytes(table,key string) ([]byte,error){ - myKey := []byte(PREFIX+"-"+table+"-"+key) + myKey := []byte(This.prefix+"-"+table+"-"+key) s,err := This.client.GetKeyVal(myKey) return s,err } func (This *Client) PutKeyValBytes(table,key string,val []byte) error{ - myKey := []byte(PREFIX+"-"+table+"-"+key) + myKey := []byte(This.prefix+"-"+table+"-"+key) err := This.client.PutKeyVal(myKey, val) return err } func (This *Client) DelKeyVal(table,key string) error{ - myKey := []byte(PREFIX+"-"+table+"-"+key) + myKey := []byte(This.prefix+"-"+table+"-"+key) return This.client.DelKeyVal(myKey) } -func (This *Client) GetListByKeyPrefix(table,key string,data interface{}) ([]string,error){ - myKey := []byte(PREFIX+"-"+table+"-"+key) +func (This *Client) GetListByKeyPrefix(table,key string,data interface{}) ([]driver.ListValue,error){ + myKey := []byte(This.prefix+"-"+table+"-"+key) s,err := This.client.GetListByKeyPrefix(myKey) if err != nil{ return s,err @@ -75,9 +83,9 @@ func (This *Client) GetListByKeyPrefix(table,key string,data interface{}) ([]str val := "" for _,v := range s{ if val == ""{ - val = v + val = v.Value }else{ - val += ","+v + val += ","+v.Value } } val = "["+val+"]" diff --git a/xdb/xdb_test.go b/xdb/xdb_test.go index 880b5e3d..f32b117d 100644 --- a/xdb/xdb_test.go +++ b/xdb/xdb_test.go @@ -14,6 +14,7 @@ func TestClient(t *testing.T) { os.Exit(1) } defer client.Close() + client.SetPrefix("xdbtest") type DataSource struct { Name string @@ -27,20 +28,30 @@ func TestClient(t *testing.T) { key1 = "tstst1" val1 = DataSource{Name:"sss",Uri:"URI1"} - client.PutKeyVal(table,key1,val1) + err0 := client.PutKeyVal(table,key1,val1) + if err0 != nil{ + t.Fatal(key1, " put error:",err0) + }else{ + t.Log(key1," put success") + } key2 = "tstst2" val2 = DataSource{Name:"sss22",Uri:"URI1222"} - client.PutKeyVal(table,key2,val2) + err0 = client.PutKeyVal(table,key2,val2) + + if err0 != nil{ + t.Fatal(key2, " put error:",err0) + }else{ + t.Log(key2," put success") + } var data1 DataSource c1,err1:=client.GetKeyVal(table,key1,&data1) t.Log("data1:",data1," c1:",string(c1)) if err1 != nil{ - log.Fatal(err1) + t.Fatal(err1) } - var data3 []DataSource c2,err2:=client.GetListByKeyPrefix(table,"",&data3) t.Log( " c2:",c2) @@ -48,8 +59,8 @@ func TestClient(t *testing.T) { t.Fatal(err2) } - for k,v := range c2{ - log.Println(k,"val:",string(v)) + for _,v := range c2{ + log.Println(v.Key,"val:",v.Value) } }