Skip to content

Commit

Permalink
xdb update
Browse files Browse the repository at this point in the history
  • Loading branch information
jc3wish authored and jc3wish committed Oct 21, 2019
1 parent 3deb3d2 commit 67ea3f0
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 28 deletions.
7 changes: 6 additions & 1 deletion xdb/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ type XdbDriver interface {
GetKeyVal(key []byte) ([]byte,error)
PutKeyVal(key []byte,val []byte) error
DelKeyVal(key []byte) error
GetListByKeyPrefix(key []byte) ([]string,error)
GetListByKeyPrefix(key []byte) ([]ListValue,error)
Close() error
}

type ListValue struct {
Key string
Value string
}

func Register(name string, driver Driver,version string) {
defer func() {
if err := recover();err!=nil{
Expand Down
22 changes: 11 additions & 11 deletions xdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/brokercap/Bifrost/xdb/driver"
"os"
"log"
"fmt"
"strings"
)

const VERSION = "v1.1.0"
Expand Down Expand Up @@ -40,9 +40,6 @@ type Conn struct {
func (This *Conn) connect() error{
os.MkdirAll(This.path, 0700)
This.levelDB, This.err = leveldb.OpenFile(This.path, nil)
if This.err != nil{
log.Println("sdfsdfsd",This.err)
}
return This.err
}

Expand All @@ -53,6 +50,9 @@ func (This *Conn) Close() (error){

func (This *Conn) GetKeyVal(key []byte) ([]byte,error){
s, err := This.levelDB.Get(key, nil)
if err != nil && strings.Contains(err.Error(),"not found"){
return []byte(""),err
}
return s,err
}

Expand All @@ -65,15 +65,15 @@ func (This *Conn) DelKeyVal(key []byte) error{
return This.levelDB.Delete(key,nil)
}

func (This *Conn) GetListByKeyPrefix(key []byte) ([]string,error){
data := make([]string,0)
func (This *Conn) GetListByKeyPrefix(key []byte) ([]driver.ListValue,error){
data := make([]driver.ListValue,0)
iter := This.levelDB.NewIterator(util.BytesPrefix(key), nil)
for iter.Next() {
//tmp := make([][]byte,1)
//tmp[0] = iter.Key()
//tmp[1] = iter.Value()
//log.Println("tmp1:",string(iter.Value()))
data = append(data,string(iter.Value()))
data = append(data,
driver.ListValue{
Key:string(iter.Key()),
Value:string(iter.Value()),
})
}
iter.Release()
return data,nil
Expand Down
111 changes: 111 additions & 0 deletions xdb/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package xdb

import (
"sync"
"time"
"fmt"
"runtime/debug"
"log"
)

type PoolClient struct {
sync.RWMutex
ClientChan chan *Client
Uri string
MaxClientCount uint8
CurrentClientCount uint8
AvailableCount uint8
}

var clientPool map[string]*PoolClient

func init() {
clientPool = make(map[string]*PoolClient,0)
}

func InitClientPool(name string,uri string,count uint8) error{
if _,ok:=clientPool[name];!ok{
clientPool[name] = &PoolClient{
ClientChan: make(chan *Client, int(count)),
Uri:uri,
MaxClientCount: count,
CurrentClientCount: 0,
AvailableCount:0,
}
}
return nil
}

func GetClient(name string) (c *Client,err error) {
if _,ok:=clientPool[name];!ok{
return nil,fmt.Errorf(name+" not esxit")
}
t := clientPool[name]
t.Lock()
if t.AvailableCount > 0 {
t.AvailableCount--
t.Unlock()
//这里为什么不需要timeout,是因为前面加了lock 判断空闲连接数
c = <- t.ClientChan
return
}
if t.MaxClientCount > t.CurrentClientCount{
t.CurrentClientCount++
t.Unlock()
f,stringKey := NewClient(name,t.Uri)
if f == nil{
t.Lock()
t.CurrentClientCount--
t.Unlock()
}
return f,stringKey
}
t.Unlock()
timer := time.NewTimer(5 * time.Second)
select {
case c = <-t.ClientChan:
break
case <- timer.C:
break
}
timer.Stop()
if c == nil{
return nil,fmt.Errorf("get client time out")
}
t.Lock()
t.AvailableCount--
t.Unlock()
return
}

func BackCient(name string,c *Client) bool {
defer func() {
if err := recover();err !=nil{
log.Println(string(debug.Stack()))
return
}
}()
if _,ok:=clientPool[name];!ok{
return true
}
t:=clientPool[name]
t.Lock()
if t.CurrentClientCount > t.MaxClientCount{
t.CurrentClientCount--
func(){
defer func() {
if err := recover();err != nil{
log.Println(string(debug.Stack()))
return
}
}()
//调用插件函数,关闭连接,这里防止插件代码写得有问题,抛异常,所以这里需要recover一次
c.Close()
}()
}else{
t.AvailableCount++
t.ClientChan <- c
}
t.Unlock()
return true
}
67 changes: 67 additions & 0 deletions xdb/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package xdb_test

import (
"github.com/brokercap/Bifrost/xdb"
"testing"
)

func setKeyVal(table,key1 string,value interface{}) error {
client,err := xdb.GetClient("leveldb")
if err != nil{
return err
}
defer xdb.BackCient("leveldb",client)
client.SetPrefix("xdbtest").PutKeyVal(table,key1,value)
return nil
}

func getKeyVal(table,key1 string) ([]byte,error) {
client,err := xdb.GetClient("leveldb")
if err != nil{
return nil,err
}
defer xdb.BackCient("leveldb",client)
c,err := client.GetKeyValBytes(table,key1)
return c,err
}

func TestPool(t *testing.T) {
xdb.InitClientPool("leveldb","./myleveldir4",1)

type DataSource struct {
Name string
Uri string
}

var table,key1,key2 string
var val1,val2 DataSource

table = "data_source"

key1 = "tstst1"
val1 = DataSource{Name:"sss",Uri:"URI1"}
err0 := setKeyVal(table,key1,val1)
if err0 != nil{
t.Fatal(key1, " put error:",err0)
}else{
t.Log(key1," put success")
}

key2 = "tstst2"
val2 = DataSource{Name:"sss22",Uri:"URI1222"}
err0 = setKeyVal(table,key2,val2)

if err0 != nil{
t.Fatal(key2, " put error:",err0)
}else{
t.Log(key2," put success")
}

c,err1 := getKeyVal(table,key1)
t.Log(key1," c:",string(c),"err1:",err1)


c,err1 = getKeyVal(table,key2)
t.Log(key2," c:",string(c),"err1:",err1)

}
28 changes: 18 additions & 10 deletions xdb/xdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
_ "github.com/brokercap/Bifrost/xdb/leveldb"
)

const PREFIX = "xdb"
const DEFAULT_PREFIX = "xdb"

type Client struct {
prefix string
client driver.XdbDriver
}

Expand All @@ -22,11 +23,18 @@ func NewClient(name ,uri string) (*Client,error){
}
return &Client{
client:client,
prefix:DEFAULT_PREFIX,
},nil
}

func (This *Client) SetPrefix(prefix string) *Client{
This.prefix = prefix
return This
}


func (This *Client) GetKeyVal(table,key string,data interface{}) ([]byte,error){
myKey := []byte(PREFIX+"-"+table+"-"+key)
myKey := []byte(This.prefix+"-"+table+"-"+key)
s,err := This.client.GetKeyVal(myKey)
if err != nil{
return nil,err
Expand All @@ -39,7 +47,7 @@ func (This *Client) GetKeyVal(table,key string,data interface{}) ([]byte,error){
}

func (This *Client) PutKeyVal(table,key string,data interface{}) error{
myKey := []byte(PREFIX+"-"+table+"-"+key)
myKey := []byte(This.prefix+"-"+table+"-"+key)
val,err := json.Marshal(data)
if err != nil{
return err
Expand All @@ -49,35 +57,35 @@ func (This *Client) PutKeyVal(table,key string,data interface{}) error{
}

func (This *Client) GetKeyValBytes(table,key string) ([]byte,error){
myKey := []byte(PREFIX+"-"+table+"-"+key)
myKey := []byte(This.prefix+"-"+table+"-"+key)
s,err := This.client.GetKeyVal(myKey)
return s,err
}

func (This *Client) PutKeyValBytes(table,key string,val []byte) error{
myKey := []byte(PREFIX+"-"+table+"-"+key)
myKey := []byte(This.prefix+"-"+table+"-"+key)
err := This.client.PutKeyVal(myKey, val)
return err
}


func (This *Client) DelKeyVal(table,key string) error{
myKey := []byte(PREFIX+"-"+table+"-"+key)
myKey := []byte(This.prefix+"-"+table+"-"+key)
return This.client.DelKeyVal(myKey)
}

func (This *Client) GetListByKeyPrefix(table,key string,data interface{}) ([]string,error){
myKey := []byte(PREFIX+"-"+table+"-"+key)
func (This *Client) GetListByKeyPrefix(table,key string,data interface{}) ([]driver.ListValue,error){
myKey := []byte(This.prefix+"-"+table+"-"+key)
s,err := This.client.GetListByKeyPrefix(myKey)
if err != nil{
return s,err
}
val := ""
for _,v := range s{
if val == ""{
val = v
val = v.Value
}else{
val += ","+v
val += ","+v.Value
}
}
val = "["+val+"]"
Expand Down
23 changes: 17 additions & 6 deletions xdb/xdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func TestClient(t *testing.T) {
os.Exit(1)
}
defer client.Close()
client.SetPrefix("xdbtest")

type DataSource struct {
Name string
Expand All @@ -27,29 +28,39 @@ func TestClient(t *testing.T) {

key1 = "tstst1"
val1 = DataSource{Name:"sss",Uri:"URI1"}
client.PutKeyVal(table,key1,val1)
err0 := client.PutKeyVal(table,key1,val1)
if err0 != nil{
t.Fatal(key1, " put error:",err0)
}else{
t.Log(key1," put success")
}

key2 = "tstst2"
val2 = DataSource{Name:"sss22",Uri:"URI1222"}
client.PutKeyVal(table,key2,val2)
err0 = client.PutKeyVal(table,key2,val2)

if err0 != nil{
t.Fatal(key2, " put error:",err0)
}else{
t.Log(key2," put success")
}

var data1 DataSource
c1,err1:=client.GetKeyVal(table,key1,&data1)
t.Log("data1:",data1," c1:",string(c1))
if err1 != nil{
log.Fatal(err1)
t.Fatal(err1)
}


var data3 []DataSource
c2,err2:=client.GetListByKeyPrefix(table,"",&data3)
t.Log( " c2:",c2)
if err2 != nil{
t.Fatal(err2)
}

for k,v := range c2{
log.Println(k,"val:",string(v))
for _,v := range c2{
log.Println(v.Key,"val:",v.Value)
}

}

0 comments on commit 67ea3f0

Please sign in to comment.