From b27a8de25d433c6e67d505a9fd6b95880aa6c3c8 Mon Sep 17 00:00:00 2001 From: zhanghaisheng1 Date: Wed, 28 Jul 2021 21:08:55 +0800 Subject: [PATCH 1/6] feat: add zrangebylex/zrevrangebylex --- command/common.go | 22 +++++++++++++++++ command/init.go | 4 +++ command/zsets.go | 61 +++++++++++++++++++++++++++++++++++++++++++++ db/zset.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 150 insertions(+) diff --git a/command/common.go b/command/common.go index 6ac18ba..8603c22 100644 --- a/command/common.go +++ b/command/common.go @@ -245,6 +245,28 @@ func getFloatAndInclude(strf string) (float64, bool, error) { } +func getLexKeyAndInclude(prefix []byte) ([]byte,bool) { + key := []byte{} + include := true + if string(prefix) == "+" || string(prefix) == "-" { + return key,include + } + + if len(prefix) > 0 { + switch prefix[0]{ + case byte('('): + key = prefix[1:] + include = false + case byte('['): + key = prefix[1:] + default: + key = prefix + } + } + return key,include +} + + func getLimitParameters(offsetCount []string) (int64, int64, error) { if len(offsetCount) < 2 { return 0, 0, ErrSyntax diff --git a/command/init.go b/command/init.go index c7bd578..fd922e7 100644 --- a/command/init.go +++ b/command/init.go @@ -124,6 +124,10 @@ func init() { "zrevrange": Desc{Proc: AutoCommit(ZRevRange), Txn: ZRevRange, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, "zrangebyscore": Desc{Proc: AutoCommit(ZRangeByScore), Txn: ZRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, "zrevrangebyscore": Desc{Proc: AutoCommit(ZRevRangeByScore), Txn: ZRevRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, + "zrangebylex": Desc{Proc: AutoCommit(ZRangeByLex), Txn: ZRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, + "zrevrangebylex": Desc{Proc: AutoCommit(ZRevRangeByLex), Txn: ZRevRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, + + "zrem": Desc{Proc: AutoCommit(ZRem), Txn: ZRem, Cons: Constraint{-3, flags("wF"), 1, 1, 1}}, "zcard": Desc{Proc: AutoCommit(ZCard), Txn: ZCard, Cons: Constraint{2, flags("rF"), 1, 1, 1}}, "zcount": Desc{Proc: AutoCommit(ZCount), Txn: ZCount, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, diff --git a/command/zsets.go b/command/zsets.go index 752cd4f..6965ca1 100644 --- a/command/zsets.go +++ b/command/zsets.go @@ -8,6 +8,7 @@ import ( "github.com/distributedio/titan/db" "github.com/distributedio/titan/encoding/resp" + "go.uber.org/zap" ) // ZAdd adds the specified members with scores to the sorted set @@ -105,10 +106,19 @@ func ZRangeByScore(ctx *Context, txn *db.Transaction) (OnCommit, error) { return zAnyOrderRangeByScore(ctx, txn, true) } +func ZRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { + return zAnyOrderRangeByLex(ctx, txn, true) +} + func ZRevRangeByScore(ctx *Context, txn *db.Transaction) (OnCommit, error) { return zAnyOrderRangeByScore(ctx, txn, false) } +func ZRevRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { + return zAnyOrderRangeByLex(ctx, txn, false) +} + + func ZCount(ctx *Context, txn *db.Transaction) (OnCommit, error) { key := []byte(ctx.Args[0]) startScore, startInclude, err := getFloatAndInclude(ctx.Args[1]) @@ -144,6 +154,57 @@ func ZCount(ctx *Context, txn *db.Transaction) (OnCommit, error) { return Integer(ctx.Out, int64(len(items))), nil } +func zAnyOrderRangeByLex(ctx *Context, txn *db.Transaction, positiveOrder bool) (OnCommit, error) { + key := []byte(ctx.Args[0]) + + startKey, startInclude := getLexKeyAndInclude([]byte(ctx.Args[1])) + stopKey,stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2])) + if !positiveOrder{ + startKey, startInclude = getLexKeyAndInclude([]byte(ctx.Args[2])) + stopKey,stopInclude = getLexKeyAndInclude([]byte(ctx.Args[1])) + } + + zap.L().Info("zset lex start", zap.String("start",string(startKey)),zap.Bool("includestart",startInclude),zap.String("stopkey",string(stopKey)),zap.Bool("stopInclude",stopInclude)) + var( + offset int64 = 0 + count int64 = math.MaxInt64 + err error + ) + for i := 3; i < len(ctx.Args); i++ { + switch strings.ToUpper(ctx.Args[i]) { + case "LIMIT": + if offset, count, err = getLimitParameters(ctx.Args[i+1:]); err != nil { + return nil, err + } + i += 2 + default: + return nil, ErrSyntax + } + } + + zap.L().Info("zset lex start", zap.String("start",string(startKey)),zap.Bool("includestart",startInclude),zap.String("stopkey",string(stopKey)),zap.Bool("stopInclude",stopInclude),zap.Int64("offset",offset),zap.Int64("count",count)) + zset, err := txn.ZSet(key) + if err != nil { + if err == db.ErrTypeMismatch { + return nil, ErrTypeMismatch + } + return nil, errors.New("ERR " + err.Error()) + } + if !zset.Exist() { + return BytesArray(ctx.Out, nil), nil + } + + items, err := zset.ZAnyOrderRangeByLex(startKey,startInclude, stopKey,stopInclude,offset, count, positiveOrder) + if err != nil { + return nil, errors.New("ERR " + err.Error()) + } + if len(items) == 0 { + return BytesArray(ctx.Out, nil), nil + } + return BytesArray(ctx.Out, items), nil +} + + func zAnyOrderRangeByScore(ctx *Context, txn *db.Transaction, positiveOrder bool) (OnCommit, error) { key := []byte(ctx.Args[0]) diff --git a/db/zset.go b/db/zset.go index 82146d8..ab37d03 100644 --- a/db/zset.go +++ b/db/zset.go @@ -183,6 +183,67 @@ func (zset *ZSet) Exist() bool { return zset.meta.Len != 0 } +func (zset *ZSet) ZAnyOrderRangeByLex(start []byte,startInclude bool, stop []byte,stopInclude bool,offset int64,count int64, positiveOrder bool) ([][]byte, error) { + dkey := DataKey(zset.txn.db, zset.meta.ID) + memPrefix := zsetMemberKey(dkey,[]byte{}) + startPrefix := zsetMemberKey(dkey,start) + stopPrefix := zsetMemberKey(dkey,stop) + if stopInclude || len(stop) == 0 { + stopPrefix = kv.Key(stopPrefix).PrefixNext() + } + + var iter Iterator + var err error + + if positiveOrder { + iter, err = zset.txn.t.Iter(startPrefix, stopPrefix) + } else { + iter, err = zset.txn.t.IterReverse(stopPrefix) + } + + if err != nil { + return nil, err + } + + var items [][]byte + var idx int64 = -1 + + for iter.Valid() && iter.Key().HasPrefix(memPrefix){ + startCmp := iter.Key().Cmp(startPrefix) + stopCmp := iter.Key().Cmp(stopPrefix) + member := iter.Key()[len(memPrefix):] + zap.L().Info("zset lex db item",zap.Int("sr",startCmp),zap.Int("so",stopCmp),zap.String("mem",string(member)),zap.String("stopp",string(stopPrefix)),zap.Bool("stopInclude",stopInclude)) + if (!positiveOrder && startCmp < 0) || (positiveOrder && stopCmp > 0 ) { + break + } + + //ignore startPrefix ----> startKey range + if startCmp == 0 && !startInclude { + goto next + } + + //ignore stopPrefix ----> stopKey range + if len(stop) > 0 && ( kv.Key(member).Cmp(stop) > 0 || kv.Key(member).Cmp(stop) == 0 && !stopInclude) { + goto next + } + + idx++ + if offset > idx { + goto next + } + items = append(items,member) + if int64(len(items)) == count{ + break + } + + next: if err := iter.Next(); err != nil { + return nil,err + } + } + return items,nil +} + + func (zset *ZSet) ZAnyOrderRange(start int64, stop int64, withScore bool, positiveOrder bool) ([][]byte, error) { if stop < 0 { if stop = zset.meta.Len + stop; stop < 0 { @@ -505,3 +566,5 @@ func zsetScoreKey(dkey []byte, score []byte, member []byte) []byte { scoreKey = append(scoreKey, member...) return scoreKey } + + From 806d3aafff3efe867fa7a5dcfd8479c289dfff69 Mon Sep 17 00:00:00 2001 From: zhanghaisheng1 Date: Thu, 29 Jul 2021 20:49:05 +0800 Subject: [PATCH 2/6] feat: add zremrangebylex --- command/init.go | 1 + command/zsets.go | 40 ++++++++++++++++++++------ db/zset.go | 74 +++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 99 insertions(+), 16 deletions(-) diff --git a/command/init.go b/command/init.go index fd922e7..46a27ab 100644 --- a/command/init.go +++ b/command/init.go @@ -129,6 +129,7 @@ func init() { "zrem": Desc{Proc: AutoCommit(ZRem), Txn: ZRem, Cons: Constraint{-3, flags("wF"), 1, 1, 1}}, + "zremrangebylex": Desc{Proc: AutoCommit(ZRemRangeByLex), Txn: ZRemRangeByLex, Cons: Constraint{-4, flags("wF"), 1, 1, 1}}, "zcard": Desc{Proc: AutoCommit(ZCard), Txn: ZCard, Cons: Constraint{2, flags("rF"), 1, 1, 1}}, "zcount": Desc{Proc: AutoCommit(ZCount), Txn: ZCount, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, "zscore": Desc{Proc: AutoCommit(ZScore), Txn: ZScore, Cons: Constraint{3, flags("rF"), 1, 1, 1}}, diff --git a/command/zsets.go b/command/zsets.go index 6965ca1..c45fa57 100644 --- a/command/zsets.go +++ b/command/zsets.go @@ -8,7 +8,6 @@ import ( "github.com/distributedio/titan/db" "github.com/distributedio/titan/encoding/resp" - "go.uber.org/zap" ) // ZAdd adds the specified members with scores to the sorted set @@ -106,19 +105,18 @@ func ZRangeByScore(ctx *Context, txn *db.Transaction) (OnCommit, error) { return zAnyOrderRangeByScore(ctx, txn, true) } -func ZRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { - return zAnyOrderRangeByLex(ctx, txn, true) -} - func ZRevRangeByScore(ctx *Context, txn *db.Transaction) (OnCommit, error) { return zAnyOrderRangeByScore(ctx, txn, false) } +func ZRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { + return zAnyOrderRangeByLex(ctx, txn, true) +} + func ZRevRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { return zAnyOrderRangeByLex(ctx, txn, false) } - func ZCount(ctx *Context, txn *db.Transaction) (OnCommit, error) { key := []byte(ctx.Args[0]) startScore, startInclude, err := getFloatAndInclude(ctx.Args[1]) @@ -164,9 +162,8 @@ func zAnyOrderRangeByLex(ctx *Context, txn *db.Transaction, positiveOrder bool) stopKey,stopInclude = getLexKeyAndInclude([]byte(ctx.Args[1])) } - zap.L().Info("zset lex start", zap.String("start",string(startKey)),zap.Bool("includestart",startInclude),zap.String("stopkey",string(stopKey)),zap.Bool("stopInclude",stopInclude)) var( - offset int64 = 0 + offset int64 = int64(0) count int64 = math.MaxInt64 err error ) @@ -177,12 +174,12 @@ func zAnyOrderRangeByLex(ctx *Context, txn *db.Transaction, positiveOrder bool) return nil, err } i += 2 + break default: return nil, ErrSyntax } } - zap.L().Info("zset lex start", zap.String("start",string(startKey)),zap.Bool("includestart",startInclude),zap.String("stopkey",string(stopKey)),zap.Bool("stopInclude",stopInclude),zap.Int64("offset",offset),zap.Int64("count",count)) zset, err := txn.ZSet(key) if err != nil { if err == db.ErrTypeMismatch { @@ -292,6 +289,31 @@ func ZRem(ctx *Context, txn *db.Transaction) (OnCommit, error) { return Integer(ctx.Out, deleted), nil } +func ZRemRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { + key := []byte(ctx.Args[0]) + startKey, startInclude := getLexKeyAndInclude([]byte(ctx.Args[1])) + stopKey,stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2])) + zset, err := txn.ZSet(key) + if err != nil { + if err == db.ErrTypeMismatch { + return nil, ErrTypeMismatch + } + return nil, errors.New("ERR " + err.Error()) + } + if !zset.Exist() { + return Integer(ctx.Out, 0), nil + } + + deleted, err := zset.ZRemRangeByLex(startKey,startInclude,stopKey,stopInclude) + if err != nil { + return nil, errors.New("ERR " + err.Error()) + } + + return Integer(ctx.Out, deleted), nil +} + + + func ZCard(ctx *Context, txn *db.Transaction) (OnCommit, error) { key := []byte(ctx.Args[0]) diff --git a/db/zset.go b/db/zset.go index ab37d03..75553de 100644 --- a/db/zset.go +++ b/db/zset.go @@ -191,9 +191,12 @@ func (zset *ZSet) ZAnyOrderRangeByLex(start []byte,startInclude bool, stop []byt if stopInclude || len(stop) == 0 { stopPrefix = kv.Key(stopPrefix).PrefixNext() } - - var iter Iterator - var err error + var( + iter Iterator + items [][]byte + idx int64 = -1 + err error + ) if positiveOrder { iter, err = zset.txn.t.Iter(startPrefix, stopPrefix) @@ -205,14 +208,10 @@ func (zset *ZSet) ZAnyOrderRangeByLex(start []byte,startInclude bool, stop []byt return nil, err } - var items [][]byte - var idx int64 = -1 - for iter.Valid() && iter.Key().HasPrefix(memPrefix){ startCmp := iter.Key().Cmp(startPrefix) stopCmp := iter.Key().Cmp(stopPrefix) member := iter.Key()[len(memPrefix):] - zap.L().Info("zset lex db item",zap.Int("sr",startCmp),zap.Int("so",stopCmp),zap.String("mem",string(member)),zap.String("stopp",string(stopPrefix)),zap.Bool("stopInclude",stopInclude)) if (!positiveOrder && startCmp < 0) || (positiveOrder && stopCmp > 0 ) { break } @@ -474,6 +473,67 @@ func (zset *ZSet) ZRem(members [][]byte) (int64, error) { zap.L().Debug("zrem update meta key", zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000)) return deleted, err } + +func (zset *ZSet) ZRemRangeByLex(start []byte,startInclude bool, stop []byte,stopInclude bool)(int64,error){ + deleted := int64(0); + dkey := DataKey(zset.txn.db, zset.meta.ID) + memPrefix := zsetMemberKey(dkey,[]byte{}) + startPrefix := zsetMemberKey(dkey,start) + stopPrefix := zsetMemberKey(dkey,stop) + if stopInclude || len(stop) == 0 { + stopPrefix = kv.Key(stopPrefix).PrefixNext() + } + var( + iter Iterator + scoreKey []byte + err error + ) + + if iter, err = zset.txn.t.Iter(startPrefix, stopPrefix);err != nil { + return deleted, err + } + + for iter.Valid() && iter.Key().HasPrefix(memPrefix){ + startCmp := iter.Key().Cmp(startPrefix) + stopCmp := iter.Key().Cmp(stopPrefix) + member := iter.Key()[len(memPrefix):] + if ( stopCmp > 0 ) { + break + } + + if startCmp == 0 && !startInclude { + goto next + } + scoreKey = zsetScoreKey(dkey,iter.Value(),member) + + if err = zset.txn.t.Delete(scoreKey);err != nil { + return deleted,err + } + if err = zset.txn.t.Delete(iter.Key());err != nil { + return deleted,err + } + deleted++ + next: if err := iter.Next(); err != nil { + return deleted,err + } + } + if zset.meta.Len == deleted { + mkey := MetaKey(zset.txn.db, zset.key) + err = zset.txn.t.Delete(mkey) + if err != nil { + return deleted, err + } + if zset.meta.Object.ExpireAt > 0 { + if err = unExpireAt(zset.txn.t, mkey, zset.meta.Object.ExpireAt);err != nil { + return deleted,err + } + } + return deleted, nil + } + + return deleted,nil +} + func (zset *ZSet) ZCard() int64 { return zset.meta.Len } From 6a11842b0bcb7a085f3b993646872afa1e7e4e81 Mon Sep 17 00:00:00 2001 From: zhanghaisheng1 Date: Thu, 29 Jul 2021 20:53:35 +0800 Subject: [PATCH 3/6] feaut: update command list --- docs/command_list.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/command_list.md b/docs/command_list.md index f50cf21..437a4eb 100644 --- a/docs/command_list.md +++ b/docs/command_list.md @@ -139,27 +139,27 @@ - [ ] bzpopmax - [x] zadd - [x] zcard -- [ ] zcount +- [x] zcount - [ ] zincrby - [ ] zinterstore - [ ] zlexcount - [ ] zpopmax - [ ] zpopmin - [x] zrange -- [ ] zrangebylex -- [ ] zrevrangebylex +- [x] zrangebylex +- [x] zrevrangebylex - [x] zrangebyscore - [ ] zrank - [x] zrem -- [ ] zremrangebylex +- [x] zremrangebylex - [ ] zremrangebyrank - [ ] zremrangebyscore - [x] zrevrange -- [ ] zrevrangebyscore +- [x] zrevrangebyscore - [ ] zrevrank - [x] zscore - [ ] zunionstore -- [ ] zscan +- [x] zscan ### Geo From 3ebc0784ce148796a954caf1ff61b13471faa5be Mon Sep 17 00:00:00 2001 From: nio Date: Sat, 31 Jul 2021 12:25:51 +0800 Subject: [PATCH 4/6] rewrite zrangebylex --- command/zsets.go | 21 +++---- db/db.go | 3 + db/zset.go | 161 ++++++++++++++++++++++++----------------------- 3 files changed, 96 insertions(+), 89 deletions(-) diff --git a/command/zsets.go b/command/zsets.go index c45fa57..d766da6 100644 --- a/command/zsets.go +++ b/command/zsets.go @@ -156,16 +156,16 @@ func zAnyOrderRangeByLex(ctx *Context, txn *db.Transaction, positiveOrder bool) key := []byte(ctx.Args[0]) startKey, startInclude := getLexKeyAndInclude([]byte(ctx.Args[1])) - stopKey,stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2])) - if !positiveOrder{ + stopKey, stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2])) + if !positiveOrder { startKey, startInclude = getLexKeyAndInclude([]byte(ctx.Args[2])) - stopKey,stopInclude = getLexKeyAndInclude([]byte(ctx.Args[1])) + stopKey, stopInclude = getLexKeyAndInclude([]byte(ctx.Args[1])) } - var( + var ( offset int64 = int64(0) - count int64 = math.MaxInt64 - err error + count int64 = math.MaxInt64 + err error ) for i := 3; i < len(ctx.Args); i++ { switch strings.ToUpper(ctx.Args[i]) { @@ -191,7 +191,7 @@ func zAnyOrderRangeByLex(ctx *Context, txn *db.Transaction, positiveOrder bool) return BytesArray(ctx.Out, nil), nil } - items, err := zset.ZAnyOrderRangeByLex(startKey,startInclude, stopKey,stopInclude,offset, count, positiveOrder) + items, err := zset.ZOrderRangeByLex(startKey, stopKey, startInclude, stopInclude, offset, count, positiveOrder) if err != nil { return nil, errors.New("ERR " + err.Error()) } @@ -201,7 +201,6 @@ func zAnyOrderRangeByLex(ctx *Context, txn *db.Transaction, positiveOrder bool) return BytesArray(ctx.Out, items), nil } - func zAnyOrderRangeByScore(ctx *Context, txn *db.Transaction, positiveOrder bool) (OnCommit, error) { key := []byte(ctx.Args[0]) @@ -292,7 +291,7 @@ func ZRem(ctx *Context, txn *db.Transaction) (OnCommit, error) { func ZRemRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { key := []byte(ctx.Args[0]) startKey, startInclude := getLexKeyAndInclude([]byte(ctx.Args[1])) - stopKey,stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2])) + stopKey, stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2])) zset, err := txn.ZSet(key) if err != nil { if err == db.ErrTypeMismatch { @@ -304,7 +303,7 @@ func ZRemRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { return Integer(ctx.Out, 0), nil } - deleted, err := zset.ZRemRangeByLex(startKey,startInclude,stopKey,stopInclude) + deleted, err := zset.ZRemRangeByLex(startKey, stopKey, startInclude, stopInclude) if err != nil { return nil, errors.New("ERR " + err.Error()) } @@ -312,8 +311,6 @@ func ZRemRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { return Integer(ctx.Out, deleted), nil } - - func ZCard(ctx *Context, txn *db.Transaction) (OnCommit, error) { key := []byte(ctx.Args[0]) diff --git a/db/db.go b/db/db.go index f65f3be..46a776f 100644 --- a/db/db.go +++ b/db/db.go @@ -62,6 +62,9 @@ var ( // Iterator store.Iterator type Iterator store.Iterator +// Iterator Callback +type FnCall func(key, val []byte) bool + // DBID is the redis database ID type DBID byte diff --git a/db/zset.go b/db/zset.go index 75553de..c37bee2 100644 --- a/db/zset.go +++ b/db/zset.go @@ -183,66 +183,52 @@ func (zset *ZSet) Exist() bool { return zset.meta.Len != 0 } -func (zset *ZSet) ZAnyOrderRangeByLex(start []byte,startInclude bool, stop []byte,stopInclude bool,offset int64,count int64, positiveOrder bool) ([][]byte, error) { - dkey := DataKey(zset.txn.db, zset.meta.ID) - memPrefix := zsetMemberKey(dkey,[]byte{}) - startPrefix := zsetMemberKey(dkey,start) - stopPrefix := zsetMemberKey(dkey,stop) - if stopInclude || len(stop) == 0 { - stopPrefix = kv.Key(stopPrefix).PrefixNext() - } - var( - iter Iterator - items [][]byte - idx int64 = -1 - err error +func (zset *ZSet) ZOrderRangeByLex(start, stop []byte, startInclude, stopInclude bool, offset int64, count int64, positiveOrder bool) ([][]byte, error) { + var ( + items [][]byte + idx int64 = -1 + err error ) - - if positiveOrder { - iter, err = zset.txn.t.Iter(startPrefix, stopPrefix) - } else { - iter, err = zset.txn.t.IterReverse(stopPrefix) - } - - if err != nil { - return nil, err - } - - for iter.Valid() && iter.Key().HasPrefix(memPrefix){ - startCmp := iter.Key().Cmp(startPrefix) - stopCmp := iter.Key().Cmp(stopPrefix) - member := iter.Key()[len(memPrefix):] - if (!positiveOrder && startCmp < 0) || (positiveOrder && stopCmp > 0 ) { - break - } - - //ignore startPrefix ----> startKey range + dkey := DataKey(zset.txn.db, zset.meta.ID) + memPrefix := zsetMemberKey(dkey, []byte{}) + zap.L().Info("zorderrange", zap.String("sr", string(start)), zap.String("so", string(stop)), zap.Bool("starin", startInclude), zap.Bool("stopin", stopInclude), zap.Bool("position", positiveOrder)) + f := func(key, val []byte) bool { + member := key[len(memPrefix):] + zap.L().Info("iterm", zap.String("K", string(key)), zap.String("v", string(val))) + startCmp := kv.Key(member).Cmp(start) + stopCmp := kv.Key(member).Cmp(stop) + if len(stop) == 0 { + stopCmp = -1 + } + if (!positiveOrder && startCmp < 0) || (positiveOrder && stopCmp > 0) { + return false + } + //ignore startPrefix ----> startKey range if startCmp == 0 && !startInclude { - goto next + return true } //ignore stopPrefix ----> stopKey range - if len(stop) > 0 && ( kv.Key(member).Cmp(stop) > 0 || kv.Key(member).Cmp(stop) == 0 && !stopInclude) { - goto next + if len(stop) > 0 && stopCmp > 0 || stopCmp == 0 && !stopInclude { + return true } idx++ if offset > idx { - goto next - } - items = append(items,member) - if int64(len(items)) == count{ - break + return true } - - next: if err := iter.Next(); err != nil { - return nil,err + items = append(items, member) + if int64(len(items)) == count { + return false } + return true } - return items,nil + if err = zset.ZAnyOrderRangeByLex(start, stop, stopInclude, positiveOrder, f); err != nil { + return nil, err + } + return items, nil } - func (zset *ZSet) ZAnyOrderRange(start int64, stop int64, withScore bool, positiveOrder bool) ([][]byte, error) { if stop < 0 { if stop = zset.meta.Len + stop; stop < 0 { @@ -474,48 +460,72 @@ func (zset *ZSet) ZRem(members [][]byte) (int64, error) { return deleted, err } -func (zset *ZSet) ZRemRangeByLex(start []byte,startInclude bool, stop []byte,stopInclude bool)(int64,error){ - deleted := int64(0); +func (zset *ZSet) ZAnyOrderRangeByLex(start, stop []byte, stopInclude, positiveOrder bool, callback FnCall) error { dkey := DataKey(zset.txn.db, zset.meta.ID) - memPrefix := zsetMemberKey(dkey,[]byte{}) - startPrefix := zsetMemberKey(dkey,start) - stopPrefix := zsetMemberKey(dkey,stop) + memPrefix := zsetMemberKey(dkey, []byte{}) + startPrefix := zsetMemberKey(dkey, start) + stopPrefix := zsetMemberKey(dkey, stop) if stopInclude || len(stop) == 0 { stopPrefix = kv.Key(stopPrefix).PrefixNext() } - var( + + var ( iter Iterator - scoreKey []byte - err error + err error ) - if iter, err = zset.txn.t.Iter(startPrefix, stopPrefix);err != nil { - return deleted, err + if positiveOrder { + iter, err = zset.txn.t.Iter(startPrefix, stopPrefix) + } else { + iter, err = zset.txn.t.IterReverse(stopPrefix) + } + + if err != nil { + return err } - for iter.Valid() && iter.Key().HasPrefix(memPrefix){ - startCmp := iter.Key().Cmp(startPrefix) - stopCmp := iter.Key().Cmp(stopPrefix) - member := iter.Key()[len(memPrefix):] - if ( stopCmp > 0 ) { + for iter.Valid() && iter.Key().HasPrefix(memPrefix) { + if !callback(iter.Key(), iter.Value()) { break } + if err := iter.Next(); err != nil { + return err + } + } + return nil +} + +func (zset *ZSet) ZRemRangeByLex(start, stop []byte, startInclude, stopInclude bool) (int64, error) { + var ( + deleted int64 = 0 + dkey []byte = DataKey(zset.txn.db, zset.meta.ID) + memPrefix []byte = zsetMemberKey(dkey, []byte{}) + err error + ) + f := func(key, val []byte) bool { + member := key[len(memPrefix):] + startCmp := kv.Key(member).Cmp(start) + stopCmp := kv.Key(member).Cmp(stop) + if len(stop) > 0 && stopCmp > 0 { + return false + } if startCmp == 0 && !startInclude { - goto next + return true } - scoreKey = zsetScoreKey(dkey,iter.Value(),member) - - if err = zset.txn.t.Delete(scoreKey);err != nil { - return deleted,err + scoreKey := zsetScoreKey(dkey, val, member) + if err = zset.txn.t.Delete(scoreKey); err != nil { + return false } - if err = zset.txn.t.Delete(iter.Key());err != nil { - return deleted,err + if err = zset.txn.t.Delete(key); err != nil { + return false } deleted++ - next: if err := iter.Next(); err != nil { - return deleted,err - } + return true + } + + if err = zset.ZAnyOrderRangeByLex(start, stop, stopInclude, true, f); err != nil { + return deleted, err } if zset.meta.Len == deleted { mkey := MetaKey(zset.txn.db, zset.key) @@ -524,14 +534,13 @@ func (zset *ZSet) ZRemRangeByLex(start []byte,startInclude bool, stop []byte,sto return deleted, err } if zset.meta.Object.ExpireAt > 0 { - if err = unExpireAt(zset.txn.t, mkey, zset.meta.Object.ExpireAt);err != nil { - return deleted,err + if err = unExpireAt(zset.txn.t, mkey, zset.meta.Object.ExpireAt); err != nil { + return deleted, err } } - return deleted, nil } - return deleted,nil + return deleted, nil } func (zset *ZSet) ZCard() int64 { @@ -626,5 +635,3 @@ func zsetScoreKey(dkey []byte, score []byte, member []byte) []byte { scoreKey = append(scoreKey, member...) return scoreKey } - - From 7561f8c38b982ccd75c4ebdd4af8007cef34412c Mon Sep 17 00:00:00 2001 From: nio Date: Sat, 31 Jul 2021 12:52:22 +0800 Subject: [PATCH 5/6] feaut: add zlexcount --- command/init.go | 16 ++++++++-------- command/zsets.go | 23 +++++++++++++++++++++++ db/zset.go | 2 -- docs/command_list.md | 2 +- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/command/init.go b/command/init.go index 46a27ab..3796d0c 100644 --- a/command/init.go +++ b/command/init.go @@ -124,16 +124,16 @@ func init() { "zrevrange": Desc{Proc: AutoCommit(ZRevRange), Txn: ZRevRange, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, "zrangebyscore": Desc{Proc: AutoCommit(ZRangeByScore), Txn: ZRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, "zrevrangebyscore": Desc{Proc: AutoCommit(ZRevRangeByScore), Txn: ZRevRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, - "zrangebylex": Desc{Proc: AutoCommit(ZRangeByLex), Txn: ZRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, + "zrangebylex": Desc{Proc: AutoCommit(ZRangeByLex), Txn: ZRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, "zrevrangebylex": Desc{Proc: AutoCommit(ZRevRangeByLex), Txn: ZRevRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, - - "zrem": Desc{Proc: AutoCommit(ZRem), Txn: ZRem, Cons: Constraint{-3, flags("wF"), 1, 1, 1}}, - "zremrangebylex": Desc{Proc: AutoCommit(ZRemRangeByLex), Txn: ZRemRangeByLex, Cons: Constraint{-4, flags("wF"), 1, 1, 1}}, - "zcard": Desc{Proc: AutoCommit(ZCard), Txn: ZCard, Cons: Constraint{2, flags("rF"), 1, 1, 1}}, - "zcount": Desc{Proc: AutoCommit(ZCount), Txn: ZCount, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, - "zscore": Desc{Proc: AutoCommit(ZScore), Txn: ZScore, Cons: Constraint{3, flags("rF"), 1, 1, 1}}, - "zscan": Desc{Proc: AutoCommit(ZScan), Txn: ZScan, Cons: Constraint{-3, flags("rF"), 1, 1, 1}}, + "zrem": Desc{Proc: AutoCommit(ZRem), Txn: ZRem, Cons: Constraint{-3, flags("wF"), 1, 1, 1}}, + "zremrangebylex": Desc{Proc: AutoCommit(ZRemRangeByLex), Txn: ZRemRangeByLex, Cons: Constraint{-4, flags("wF"), 1, 1, 1}}, + "zcard": Desc{Proc: AutoCommit(ZCard), Txn: ZCard, Cons: Constraint{2, flags("rF"), 1, 1, 1}}, + "zcount": Desc{Proc: AutoCommit(ZCount), Txn: ZCount, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, + "zlexcount": Desc{Proc: AutoCommit(ZLexCount), Txn: ZLexCount, Cons: Constraint{-4, flags("rF"), 1, 1, 1}}, + "zscore": Desc{Proc: AutoCommit(ZScore), Txn: ZScore, Cons: Constraint{3, flags("rF"), 1, 1, 1}}, + "zscan": Desc{Proc: AutoCommit(ZScan), Txn: ZScan, Cons: Constraint{-3, flags("rF"), 1, 1, 1}}, // extension commands "escan": Desc{Proc: AutoCommit(Escan), Txn: Escan, Cons: Constraint{-1, flags("rR"), 0, 0, 0}}, diff --git a/command/zsets.go b/command/zsets.go index d766da6..94ef3bb 100644 --- a/command/zsets.go +++ b/command/zsets.go @@ -117,6 +117,29 @@ func ZRevRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) { return zAnyOrderRangeByLex(ctx, txn, false) } +func ZLexCount(ctx *Context, txn *db.Transaction) (OnCommit, error) { + key := []byte(ctx.Args[0]) + startKey, startInclude := getLexKeyAndInclude([]byte(ctx.Args[1])) + stopKey, stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2])) + zset, err := txn.ZSet(key) + if err != nil { + if err == db.ErrTypeMismatch { + return nil, ErrTypeMismatch + } + return nil, errors.New("ERR " + err.Error()) + } + if !zset.Exist() { + return Integer(ctx.Out, 0), nil + } + + items, err := zset.ZOrderRangeByLex(startKey, stopKey, startInclude, stopInclude, 0, math.MaxInt64, true) + if err != nil { + return nil, errors.New("ERR " + err.Error()) + } + + return Integer(ctx.Out, int64(len(items))), nil +} + func ZCount(ctx *Context, txn *db.Transaction) (OnCommit, error) { key := []byte(ctx.Args[0]) startScore, startInclude, err := getFloatAndInclude(ctx.Args[1]) diff --git a/db/zset.go b/db/zset.go index c37bee2..3665cc7 100644 --- a/db/zset.go +++ b/db/zset.go @@ -191,10 +191,8 @@ func (zset *ZSet) ZOrderRangeByLex(start, stop []byte, startInclude, stopInclude ) dkey := DataKey(zset.txn.db, zset.meta.ID) memPrefix := zsetMemberKey(dkey, []byte{}) - zap.L().Info("zorderrange", zap.String("sr", string(start)), zap.String("so", string(stop)), zap.Bool("starin", startInclude), zap.Bool("stopin", stopInclude), zap.Bool("position", positiveOrder)) f := func(key, val []byte) bool { member := key[len(memPrefix):] - zap.L().Info("iterm", zap.String("K", string(key)), zap.String("v", string(val))) startCmp := kv.Key(member).Cmp(start) stopCmp := kv.Key(member).Cmp(stop) if len(stop) == 0 { diff --git a/docs/command_list.md b/docs/command_list.md index 437a4eb..3e2f391 100644 --- a/docs/command_list.md +++ b/docs/command_list.md @@ -142,7 +142,7 @@ - [x] zcount - [ ] zincrby - [ ] zinterstore -- [ ] zlexcount +- [x] zlexcount - [ ] zpopmax - [ ] zpopmin - [x] zrange From e1ca2a9e9ffd28a7d4a6a141de78382ee79e5a25 Mon Sep 17 00:00:00 2001 From: nio Date: Sat, 31 Jul 2021 14:12:50 +0800 Subject: [PATCH 6/6] add auto test --- tools/autotest/auto.go | 7 ++++ tools/autotest/cmd/zset.go | 82 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/tools/autotest/auto.go b/tools/autotest/auto.go index 11408a4..d12b775 100644 --- a/tools/autotest/auto.go +++ b/tools/autotest/auto.go @@ -192,6 +192,13 @@ func (ac *AutoClient) ZSetCase(t *testing.T) { ac.ez.ZScanEqual(t, "key-zset", "0", "*", 2, "member2 member4 -3.5 member5 0") ac.ez.ZScanEqual(t, "key-zset", "member2", "member*", 2, "member11 member2 1.5 member1 2") + ac.ez.ZAddEqual(t, "key-zset-lex", "0", "a", "0", "aa", "0", "abc", "0", "apple", "0", "b", "0", "c", "0", "d", "0", "d1", "0", "dd", "0", "dobble", "0", "z", "0", "z1") + ac.ez.ZLexCountEqual(t, "key-zset-lex", "[a", "(abc", 2) + ac.ez.ZRangeByLexEqual(t, "key-zset-lex", "[a", "(abc", "LIMIT 0 2", "a aa") + ac.ez.ZRevRangeByLexEqual(t, "key-zset-lex", "(z", "[d1", "LIMIT 0 2", "dobble dd") + ac.ez.ZRemRangeByLexEqual(t, "key-zset-lex", 3, "(aa", "[b") + ac.ek.DelEqual(t, 1, "key-zset-lex") + ac.ez.ZRemEqual(t, "key-zset", "member2", "member1", "member3", "member4", "member1") ac.ez.ZRangeEqual(t, "key-zset", 0, -1, true) diff --git a/tools/autotest/cmd/zset.go b/tools/autotest/cmd/zset.go index c2f5116..6e35f56 100644 --- a/tools/autotest/cmd/zset.go +++ b/tools/autotest/cmd/zset.go @@ -78,6 +78,31 @@ func (ez *ExampleZSet) ZAddEqualErr(t *testing.T, errValue string, args ...inter assert.EqualError(t, err, errValue) } +func (ez *ExampleZSet) ZRemRangeByLexEqual(t *testing.T, key string, deleted int, members ...string) { + req := make([]interface{}, 0, len(members)) + req = append(req, key) + for i := range members { + req = append(req, members[i]) + } + + reply, err := redis.Int(ez.conn.Do("zremrangebylex", req...)) + assert.Equal(t, deleted, reply) + assert.Nil(t, err) +} + +func (ez *ExampleZSet) ZRangeByLexEqual(t *testing.T, key string, start string, stop string, limit string, expected string) { + ez.ZAnyOrderRangeByLexEqual(t, key, start, stop, true, limit, expected) +} + +func (ez *ExampleZSet) ZRevRangeByLexEqual(t *testing.T, key string, start string, stop string, limit string, expected string) { + ez.ZAnyOrderRangeByLexEqual(t, key, start, stop, false, limit, expected) +} + +func (ez *ExampleZSet) ZRemRangeByLexEqualErr(t *testing.T, errValue string, args ...interface{}) { + _, err := ez.conn.Do("zremrangebylex", args...) + assert.EqualError(t, err, errValue) +} + func (ez *ExampleZSet) ZRemEqual(t *testing.T, key string, members ...string) { req := make([]interface{}, 0, len(members)) req = append(req, key) @@ -197,6 +222,23 @@ func (ez *ExampleZSet) ZScanEqual(t *testing.T, key string, cursor string, patte assert.Nil(t, err) } +func (ez *ExampleZSet) ZLexCountEqual(t *testing.T, key string, start string, stop string, expected int64) { + cmd := "zlexcount" + req := make([]interface{}, 0) + req = append(req, key) + req = append(req, start) + req = append(req, stop) + + reply, err := redis.Int64(ez.conn.Do(cmd, req...)) + assert.Equal(t, expected, reply) + assert.Nil(t, err) +} + +func (ez *ExampleZSet) ZLexCountEqualErr(t *testing.T, errValue string, args ...interface{}) { + _, err := ez.conn.Do("zlexcount", args...) + assert.EqualError(t, err, errValue) +} + func (ez *ExampleZSet) ZCountEqual(t *testing.T, key string, start string, stop string, expected int64) { cmd := "zcount" req := make([]interface{}, 0) @@ -222,6 +264,46 @@ func (ez *ExampleZSet) ZRevRangeByScoreEqual(t *testing.T, key string, start str ez.ZAnyOrderRangeByScoreEqual(t, key, start, stop, withScores, false, limit, expected) } +func (ez *ExampleZSet) ZAnyOrderRangeByLexEqual(t *testing.T, key string, start string, stop string, positiveOrder bool, limit string, expected string) { + cmd := "zrangebylex" + if !positiveOrder { + cmd = "zrevrangebylex" + } + + var reply []string + var err error + req := make([]interface{}, 0) + req = append(req, key) + req = append(req, start) + req = append(req, stop) + if limit != "" { + limitArgs := strings.Split(limit, " ") + for _, limitArg := range limitArgs { + req = append(req, limitArg) + } + } + + reply, err = redis.Strings(ez.conn.Do(cmd, req...)) + if expected != "" { + expectedStrs := strings.Split(expected, " ") + assert.Equal(t, expectedStrs, reply) + } else { + assert.Equal(t, []string{}, reply) + } + + assert.Nil(t, err) +} + +func (ez *ExampleZSet) ZRangeByLexEqualErr(t *testing.T, errValue string, args ...interface{}) { + _, err := ez.conn.Do("zrangebylex", args...) + assert.EqualError(t, err, errValue) +} + +func (ez *ExampleZSet) ZRevRangeByLexEqualErr(t *testing.T, errValue string, args ...interface{}) { + _, err := ez.conn.Do("zrevrangebyscore", args...) + assert.EqualError(t, err, errValue) +} + func (ez *ExampleZSet) ZAnyOrderRangeByScoreEqual(t *testing.T, key string, start string, stop string, withScores bool, positiveOrder bool, limit string, expected string) { cmd := "zrangebyscore" if !positiveOrder {