Skip to content

Commit

Permalink
Merge branch 'tiny-craft:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
huangliu authored Jul 2, 2024
2 parents 5cdd1ba + ea44253 commit 1468eb2
Show file tree
Hide file tree
Showing 43 changed files with 850 additions and 1,088 deletions.
191 changes: 123 additions & 68 deletions backend/services/browser_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"tinyrdm/backend/types"
"tinyrdm/backend/utils/coll"
convutil "tinyrdm/backend/utils/convert"
maputil "tinyrdm/backend/utils/map"
redis2 "tinyrdm/backend/utils/redis"
sliceutil "tinyrdm/backend/utils/slice"
strutil "tinyrdm/backend/utils/string"
Expand Down Expand Up @@ -98,9 +99,9 @@ func (b *browserService) OpenConnection(name string) (resp types.JSResp) {
selConn := Connection().getConnection(name)
// correct last database index
lastDB := selConn.LastDB
if selConn.DBFilterType == "show" && !sliceutil.Contains(selConn.DBFilterList, lastDB) {
if selConn.DBFilterType == "show" && !slices.Contains(selConn.DBFilterList, lastDB) {
lastDB = selConn.DBFilterList[0]
} else if selConn.DBFilterType == "hide" && sliceutil.Contains(selConn.DBFilterList, lastDB) {
} else if selConn.DBFilterType == "hide" && slices.Contains(selConn.DBFilterList, lastDB) {
lastDB = selConn.DBFilterList[0]
}
if lastDB != selConn.LastDB {
Expand Down Expand Up @@ -222,32 +223,40 @@ func (b *browserService) OpenConnection(name string) (resp types.JSResp) {
}
}

// get redis server version
var version string
if res, err := client.Info(ctx, "server").Result(); err == nil || errors.Is(err, redis.Nil) {
info := b.parseInfo(res)
serverInfo := maputil.Get(info, "Server", map[string]string{})
version = maputil.Get(serverInfo, "redis_version", "1.0.0")
}

resp.Success = true
resp.Data = map[string]any{
"db": dbs,
"view": selConn.KeyView,
"lastDB": selConn.LastDB,
"db": dbs,
"view": selConn.KeyView,
"lastDB": selConn.LastDB,
"version": version,
}
return
}

// CloseConnection close redis server connection
func (b *browserService) CloseConnection(name string) (resp types.JSResp) {
item, ok := b.connMap[name]
if ok {
if item, ok := b.connMap[name]; ok {
delete(b.connMap, name)
if item.cancelFunc != nil {
item.cancelFunc()
}
if item.client != nil {
if item.cancelFunc != nil {
item.cancelFunc()
}
item.client.Close()
}
}
resp.Success = true
return
}

func (b *browserService) createRedisClient(selConn types.ConnectionConfig) (client redis.UniversalClient, err error) {
func (b *browserService) createRedisClient(ctx context.Context, selConn types.ConnectionConfig) (client redis.UniversalClient, err error) {
hook := redis2.NewHook(selConn.Name, func(cmd string, cost int64) {
now := time.Now()
//last := strings.LastIndex(cmd, ":")
Expand All @@ -268,10 +277,10 @@ func (b *browserService) createRedisClient(selConn types.ConnectionConfig) (clie
return
}

_ = client.Do(b.ctx, "CLIENT", "SETNAME", url.QueryEscape(selConn.Name)).Err()
_ = client.Do(ctx, "CLIENT", "SETNAME", url.QueryEscape(selConn.Name)).Err()
// add hook to each node in cluster mode
if cluster, ok := client.(*redis.ClusterClient); ok {
err = cluster.ForEachShard(b.ctx, func(ctx context.Context, cli *redis.Client) error {
err = cluster.ForEachShard(ctx, func(ctx context.Context, cli *redis.Client) error {
cli.AddHook(hook)
return nil
})
Expand All @@ -283,7 +292,7 @@ func (b *browserService) createRedisClient(selConn types.ConnectionConfig) (clie
client.AddHook(hook)
}

if _, err = client.Ping(b.ctx).Result(); err != nil && !errors.Is(err, redis.Nil) {
if _, err = client.Ping(ctx).Result(); err != nil && !errors.Is(err, redis.Nil) {
err = errors.New("can not connect to redis server:" + err.Error())
return
}
Expand Down Expand Up @@ -318,13 +327,18 @@ func (b *browserService) getRedisClient(server string, db int) (item *connection
err = fmt.Errorf("no match connection \"%s\"", server)
return
}

ctx, cancelFunc := context.WithCancel(b.ctx)
b.connMap[server] = &connectionItem{
ctx: ctx,
cancelFunc: cancelFunc,
}
var connConfig = selConn.ConnectionConfig
connConfig.LastDB = db
client, err = b.createRedisClient(connConfig)
client, err = b.createRedisClient(ctx, connConfig)
if err != nil {
return
}
ctx, cancelFunc := context.WithCancel(b.ctx)
item = &connectionItem{
client: client,
ctx: ctx,
Expand Down Expand Up @@ -445,7 +459,7 @@ func (b *browserService) scanKeys(ctx context.Context, client redis.UniversalCli
filterType := len(keyType) > 0
scanSize := int64(Preferences().GetScanSize())
// define sub scan function
scan := func(ctx context.Context, cli redis.UniversalClient, appendFunc func(k []any)) error {
scan := func(ctx context.Context, cli redis.UniversalClient, count int64, appendFunc func(k []any)) error {
var loadedKey []string
var scanCount int64
for {
Expand Down Expand Up @@ -475,16 +489,22 @@ func (b *browserService) scanKeys(ctx context.Context, client redis.UniversalCli
if cluster, ok := client.(*redis.ClusterClient); ok {
// cluster mode
var mutex sync.Mutex
var totalMaster int64
cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
totalMaster += 1
return nil
})
partCount := count / max(totalMaster, 1)
err = cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
// FIXME: BUG? can not fully load in cluster mode? maybe remove the shared "cursor"
return scan(ctx, cli, func(k []any) {
return scan(ctx, cli, partCount, func(k []any) {
mutex.Lock()
keys = append(keys, k...)
mutex.Unlock()
})
})
} else {
err = scan(ctx, client, func(k []any) {
err = scan(ctx, client, count, func(k []any) {
keys = append(keys, k...)
})
}
Expand Down Expand Up @@ -2003,21 +2023,13 @@ func (b *browserService) SetKeyTTL(server string, db int, k any, ttl int64) (res

// BatchSetTTL batch set ttl
func (b *browserService) BatchSetTTL(server string, db int, ks []any, ttl int64, serialNo string) (resp types.JSResp) {
conf := Connection().getConnection(server)
if conf == nil {
resp.Msg = fmt.Sprintf("no connection profile named: %s", server)
return
}
var client redis.UniversalClient
var err error
var connConfig = conf.ConnectionConfig
connConfig.LastDB = db
if client, err = b.createRedisClient(connConfig); err != nil {
item, err := b.getRedisClient(server, db)
if err != nil {
resp.Msg = err.Error()
return
}
client := item.client
ctx, cancelFunc := context.WithCancel(b.ctx)
defer client.Close()
defer cancelFunc()

//cancelEvent := "ttling:stop:" + serialNo
Expand All @@ -2041,7 +2053,7 @@ func (b *browserService) BatchSetTTL(server string, db int, ks []any, ttl int64,
//}
if i >= total-1 || time.Now().Sub(startTime).Milliseconds() > 100 {
startTime = time.Now()
//runtime.EventsEmit(b.ctx, processEvent, param)
//runtime.EventsEmit(ctx, processEvent, param)
// do some sleep to prevent blocking the Redis server
time.Sleep(10 * time.Millisecond)
}
Expand Down Expand Up @@ -2211,22 +2223,13 @@ func (b *browserService) DeleteOneKey(server string, db int, k any) (resp types.

// DeleteKeys delete keys sync with notification
func (b *browserService) DeleteKeys(server string, db int, ks []any, serialNo string) (resp types.JSResp) {
// connect a new connection to export keys
conf := Connection().getConnection(server)
if conf == nil {
resp.Msg = fmt.Sprintf("no connection profile named: %s", server)
return
}
var client redis.UniversalClient
var err error
var connConfig = conf.ConnectionConfig
connConfig.LastDB = db
if client, err = b.createRedisClient(connConfig); err != nil {
item, err := b.getRedisClient(server, db)
if err != nil {
resp.Msg = err.Error()
return
}
client := item.client
ctx, cancelFunc := context.WithCancel(b.ctx)
defer client.Close()
defer cancelFunc()

cancelEvent := "delete:stop:" + serialNo
Expand Down Expand Up @@ -2286,24 +2289,85 @@ func (b *browserService) DeleteKeys(server string, db int, ks []any, serialNo st
return
}

// ExportKey export keys
func (b *browserService) ExportKey(server string, db int, ks []any, path string, includeExpire bool) (resp types.JSResp) {
// connect a new connection to export keys
conf := Connection().getConnection(server)
if conf == nil {
resp.Msg = fmt.Sprintf("no connection profile named: %s", server)
// DeleteKeysByPattern delete keys by pattern
func (b *browserService) DeleteKeysByPattern(server string, db int, pattern string) (resp types.JSResp) {
item, err := b.getRedisClient(server, db)
if err != nil {
resp.Msg = err.Error()
return
}
var client redis.UniversalClient
var err error
var connConfig = conf.ConnectionConfig
connConfig.LastDB = db
if client, err = b.createRedisClient(connConfig); err != nil {
client := item.client
ctx, cancelFunc := context.WithCancel(b.ctx)
defer cancelFunc()

var ks []any
ks, _, err = b.scanKeys(ctx, client, pattern, "", 0, 0)
if err != nil {
resp.Msg = err.Error()
return
}

total := len(ks)
var canceled bool
var deletedKeys = make([]any, 0, total)
var mutex sync.Mutex
del := func(ctx context.Context, cli redis.UniversalClient) error {
const batchSize = 1000
for i := 0; i < total; i += batchSize {
pipe := cli.Pipeline()
for j := 0; j < batchSize; j++ {
if i+j < total {
pipe.Del(ctx, strutil.DecodeRedisKey(ks[i+j]))
}
}
cmders, delErr := pipe.Exec(ctx)
for j, cmder := range cmders {
if cmder.(*redis.IntCmd).Val() == 1 {
// save deleted key
mutex.Lock()
deletedKeys = append(deletedKeys, ks[i+j])
mutex.Unlock()
}
}
if errors.Is(delErr, context.Canceled) || canceled {
canceled = true
break
}
}
return nil
}

if cluster, ok := client.(*redis.ClusterClient); ok {
// cluster mode
err = cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
return del(ctx, cli)
})
} else {
err = del(ctx, client)
}

resp.Success = true
resp.Data = struct {
Canceled bool `json:"canceled"`
Deleted any `json:"deleted"`
Failed int `json:"failed"`
}{
Canceled: canceled,
Deleted: deletedKeys,
Failed: len(ks) - len(deletedKeys),
}
return
}

// ExportKey export keys
func (b *browserService) ExportKey(server string, db int, ks []any, path string, includeExpire bool) (resp types.JSResp) {
item, err := b.getRedisClient(server, db)
if err != nil {
resp.Msg = err.Error()
return
}
client := item.client
ctx, cancelFunc := context.WithCancel(b.ctx)
defer client.Close()
defer cancelFunc()

file, err := os.Create(path)
Expand Down Expand Up @@ -2372,22 +2436,13 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string,

// ImportCSV import data from csv file
func (b *browserService) ImportCSV(server string, db int, path string, conflict int, ttl int64) (resp types.JSResp) {
// connect a new connection to export keys
conf := Connection().getConnection(server)
if conf == nil {
resp.Msg = fmt.Sprintf("no connection profile named: %s", server)
return
}
var client redis.UniversalClient
var err error
var connConfig = conf.ConnectionConfig
connConfig.LastDB = db
if client, err = b.createRedisClient(connConfig); err != nil {
item, err := b.getRedisClient(server, db)
if err != nil {
resp.Msg = err.Error()
return
}
client := item.client
ctx, cancelFunc := context.WithCancel(b.ctx)
defer client.Close()
defer cancelFunc()

file, err := os.Open(path)
Expand Down Expand Up @@ -2469,7 +2524,7 @@ func (b *browserService) ImportCSV(server string, db int, path string, conflict
"ignored": ignored,
//"processing": string(key),
}
runtime.EventsEmit(b.ctx, processEvent, param)
runtime.EventsEmit(ctx, processEvent, param)
// do some sleep to prevent blocking the Redis server
time.Sleep(10 * time.Millisecond)
}
Expand Down
8 changes: 4 additions & 4 deletions backend/storage/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package storage
import (
"errors"
"gopkg.in/yaml.v3"
"slices"
"sync"
"tinyrdm/backend/consts"
"tinyrdm/backend/types"
sliceutil "tinyrdm/backend/utils/slice"
)

type ConnectionsStorage struct {
Expand Down Expand Up @@ -256,10 +256,10 @@ func (c *ConnectionsStorage) SaveSortedConnection(sortedConns types.Connections)

conns := c.GetConnectionsFlat()
takeConn := func(name string) (types.Connection, bool) {
idx, ok := sliceutil.Find(conns, func(i int) bool {
return conns[i].Name == name
idx := slices.IndexFunc(conns, func(connection types.Connection) bool {
return connection.Name == name
})
if ok {
if idx >= 0 {
ret := conns[idx]
conns = append(conns[:idx], conns[idx+1:]...)
return ret, true
Expand Down
2 changes: 1 addition & 1 deletion backend/utils/convert/binary_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (BinaryConvert) Encode(str string) (string, bool) {

func (BinaryConvert) Decode(str string) (string, bool) {
var binary strings.Builder
for _, char := range str {
for _, char := range []byte(str) {
binary.WriteString(fmt.Sprintf("%08b", int(char)))
}
return binary.String(), true
Expand Down
Loading

0 comments on commit 1468eb2

Please sign in to comment.