Skip to content

Commit

Permalink
Merge pull request #226 from distributedio/feature/zsets
Browse files Browse the repository at this point in the history
Feature/zsets
  • Loading branch information
shafreeck committed Oct 9, 2021
2 parents 7d7dc88 + e1ca2a9 commit 97868f5
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 12 deletions.
22 changes: 22 additions & 0 deletions command/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,28 @@ func getFloatAndInclude(strf string) (float64, bool, error) {

}

func getLexKeyAndInclude(prefix []byte) ([]byte,bool) {
key := []byte{}
include := true
if string(prefix) == "+" || string(prefix) == "-" {
return key,include
}

if len(prefix) > 0 {
switch prefix[0]{
case byte('('):
key = prefix[1:]
include = false
case byte('['):
key = prefix[1:]
default:
key = prefix
}
}
return key,include
}


func getLimitParameters(offsetCount []string) (int64, int64, error) {
if len(offsetCount) < 2 {
return 0, 0, ErrSyntax
Expand Down
15 changes: 10 additions & 5 deletions command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,16 @@ func init() {
"zrevrange": Desc{Proc: AutoCommit(ZRevRange), Txn: ZRevRange, Cons: Constraint{-4, flags("rF"), 1, 1, 1}},
"zrangebyscore": Desc{Proc: AutoCommit(ZRangeByScore), Txn: ZRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}},
"zrevrangebyscore": Desc{Proc: AutoCommit(ZRevRangeByScore), Txn: ZRevRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}},
"zrem": Desc{Proc: AutoCommit(ZRem), Txn: ZRem, Cons: Constraint{-3, flags("wF"), 1, 1, 1}},
"zcard": Desc{Proc: AutoCommit(ZCard), Txn: ZCard, Cons: Constraint{2, flags("rF"), 1, 1, 1}},
"zcount": Desc{Proc: AutoCommit(ZCount), Txn: ZCount, Cons: Constraint{-4, flags("rF"), 1, 1, 1}},
"zscore": Desc{Proc: AutoCommit(ZScore), Txn: ZScore, Cons: Constraint{3, flags("rF"), 1, 1, 1}},
"zscan": Desc{Proc: AutoCommit(ZScan), Txn: ZScan, Cons: Constraint{-3, flags("rF"), 1, 1, 1}},
"zrangebylex": Desc{Proc: AutoCommit(ZRangeByLex), Txn: ZRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}},
"zrevrangebylex": Desc{Proc: AutoCommit(ZRevRangeByLex), Txn: ZRevRangeByScore, Cons: Constraint{-4, flags("rF"), 1, 1, 1}},

"zrem": Desc{Proc: AutoCommit(ZRem), Txn: ZRem, Cons: Constraint{-3, flags("wF"), 1, 1, 1}},
"zremrangebylex": Desc{Proc: AutoCommit(ZRemRangeByLex), Txn: ZRemRangeByLex, Cons: Constraint{-4, flags("wF"), 1, 1, 1}},
"zcard": Desc{Proc: AutoCommit(ZCard), Txn: ZCard, Cons: Constraint{2, flags("rF"), 1, 1, 1}},
"zcount": Desc{Proc: AutoCommit(ZCount), Txn: ZCount, Cons: Constraint{-4, flags("rF"), 1, 1, 1}},
"zlexcount": Desc{Proc: AutoCommit(ZLexCount), Txn: ZLexCount, Cons: Constraint{-4, flags("rF"), 1, 1, 1}},
"zscore": Desc{Proc: AutoCommit(ZScore), Txn: ZScore, Cons: Constraint{3, flags("rF"), 1, 1, 1}},
"zscan": Desc{Proc: AutoCommit(ZScan), Txn: ZScan, Cons: Constraint{-3, flags("rF"), 1, 1, 1}},

// extension commands
"escan": Desc{Proc: AutoCommit(Escan), Txn: Escan, Cons: Constraint{-1, flags("rR"), 0, 0, 0}},
Expand Down
103 changes: 103 additions & 0 deletions command/zsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@ func ZRevRangeByScore(ctx *Context, txn *db.Transaction) (OnCommit, error) {
return zAnyOrderRangeByScore(ctx, txn, false)
}

func ZRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) {
return zAnyOrderRangeByLex(ctx, txn, true)
}

func ZRevRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) {
return zAnyOrderRangeByLex(ctx, txn, false)
}

func ZLexCount(ctx *Context, txn *db.Transaction) (OnCommit, error) {
key := []byte(ctx.Args[0])
startKey, startInclude := getLexKeyAndInclude([]byte(ctx.Args[1]))
stopKey, stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2]))
zset, err := txn.ZSet(key)
if err != nil {
if err == db.ErrTypeMismatch {
return nil, ErrTypeMismatch
}
return nil, errors.New("ERR " + err.Error())
}
if !zset.Exist() {
return Integer(ctx.Out, 0), nil
}

items, err := zset.ZOrderRangeByLex(startKey, stopKey, startInclude, stopInclude, 0, math.MaxInt64, true)
if err != nil {
return nil, errors.New("ERR " + err.Error())
}

return Integer(ctx.Out, int64(len(items))), nil
}

func ZCount(ctx *Context, txn *db.Transaction) (OnCommit, error) {
key := []byte(ctx.Args[0])
startScore, startInclude, err := getFloatAndInclude(ctx.Args[1])
Expand Down Expand Up @@ -144,6 +175,55 @@ func ZCount(ctx *Context, txn *db.Transaction) (OnCommit, error) {
return Integer(ctx.Out, int64(len(items))), nil
}

func zAnyOrderRangeByLex(ctx *Context, txn *db.Transaction, positiveOrder bool) (OnCommit, error) {
key := []byte(ctx.Args[0])

startKey, startInclude := getLexKeyAndInclude([]byte(ctx.Args[1]))
stopKey, stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2]))
if !positiveOrder {
startKey, startInclude = getLexKeyAndInclude([]byte(ctx.Args[2]))
stopKey, stopInclude = getLexKeyAndInclude([]byte(ctx.Args[1]))
}

var (
offset int64 = int64(0)
count int64 = math.MaxInt64
err error
)
for i := 3; i < len(ctx.Args); i++ {
switch strings.ToUpper(ctx.Args[i]) {
case "LIMIT":
if offset, count, err = getLimitParameters(ctx.Args[i+1:]); err != nil {
return nil, err
}
i += 2
break
default:
return nil, ErrSyntax
}
}

zset, err := txn.ZSet(key)
if err != nil {
if err == db.ErrTypeMismatch {
return nil, ErrTypeMismatch
}
return nil, errors.New("ERR " + err.Error())
}
if !zset.Exist() {
return BytesArray(ctx.Out, nil), nil
}

items, err := zset.ZOrderRangeByLex(startKey, stopKey, startInclude, stopInclude, offset, count, positiveOrder)
if err != nil {
return nil, errors.New("ERR " + err.Error())
}
if len(items) == 0 {
return BytesArray(ctx.Out, nil), nil
}
return BytesArray(ctx.Out, items), nil
}

func zAnyOrderRangeByScore(ctx *Context, txn *db.Transaction, positiveOrder bool) (OnCommit, error) {
key := []byte(ctx.Args[0])

Expand Down Expand Up @@ -231,6 +311,29 @@ func ZRem(ctx *Context, txn *db.Transaction) (OnCommit, error) {
return Integer(ctx.Out, deleted), nil
}

func ZRemRangeByLex(ctx *Context, txn *db.Transaction) (OnCommit, error) {
key := []byte(ctx.Args[0])
startKey, startInclude := getLexKeyAndInclude([]byte(ctx.Args[1]))
stopKey, stopInclude := getLexKeyAndInclude([]byte(ctx.Args[2]))
zset, err := txn.ZSet(key)
if err != nil {
if err == db.ErrTypeMismatch {
return nil, ErrTypeMismatch
}
return nil, errors.New("ERR " + err.Error())
}
if !zset.Exist() {
return Integer(ctx.Out, 0), nil
}

deleted, err := zset.ZRemRangeByLex(startKey, stopKey, startInclude, stopInclude)
if err != nil {
return nil, errors.New("ERR " + err.Error())
}

return Integer(ctx.Out, deleted), nil
}

func ZCard(ctx *Context, txn *db.Transaction) (OnCommit, error) {
key := []byte(ctx.Args[0])

Expand Down
3 changes: 3 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ var (
// Iterator store.Iterator
type Iterator store.Iterator

// Iterator Callback
type FnCall func(key, val []byte) bool

// DBID is the redis database ID
type DBID byte

Expand Down
128 changes: 128 additions & 0 deletions db/zset.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,50 @@ func (zset *ZSet) Exist() bool {
return zset.meta.Len != 0
}

func (zset *ZSet) ZOrderRangeByLex(start, stop []byte, startInclude, stopInclude bool, offset int64, count int64, positiveOrder bool) ([][]byte, error) {
var (
items [][]byte
idx int64 = -1
err error
)
dkey := DataKey(zset.txn.db, zset.meta.ID)
memPrefix := zsetMemberKey(dkey, []byte{})
f := func(key, val []byte) bool {
member := key[len(memPrefix):]
startCmp := kv.Key(member).Cmp(start)
stopCmp := kv.Key(member).Cmp(stop)
if len(stop) == 0 {
stopCmp = -1
}
if (!positiveOrder && startCmp < 0) || (positiveOrder && stopCmp > 0) {
return false
}
//ignore startPrefix ----> startKey range
if startCmp == 0 && !startInclude {
return true
}

//ignore stopPrefix ----> stopKey range
if len(stop) > 0 && stopCmp > 0 || stopCmp == 0 && !stopInclude {
return true
}

idx++
if offset > idx {
return true
}
items = append(items, member)
if int64(len(items)) == count {
return false
}
return true
}
if err = zset.ZAnyOrderRangeByLex(start, stop, stopInclude, positiveOrder, f); err != nil {
return nil, err
}
return items, nil
}

func (zset *ZSet) ZAnyOrderRange(start int64, stop int64, withScore bool, positiveOrder bool) ([][]byte, error) {
if stop < 0 {
if stop = zset.meta.Len + stop; stop < 0 {
Expand Down Expand Up @@ -413,6 +457,90 @@ func (zset *ZSet) ZRem(members [][]byte) (int64, error) {
zap.L().Debug("zrem update meta key", zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000))
return deleted, err
}

func (zset *ZSet) ZAnyOrderRangeByLex(start, stop []byte, stopInclude, positiveOrder bool, callback FnCall) error {
dkey := DataKey(zset.txn.db, zset.meta.ID)
memPrefix := zsetMemberKey(dkey, []byte{})
startPrefix := zsetMemberKey(dkey, start)
stopPrefix := zsetMemberKey(dkey, stop)
if stopInclude || len(stop) == 0 {
stopPrefix = kv.Key(stopPrefix).PrefixNext()
}

var (
iter Iterator
err error
)

if positiveOrder {
iter, err = zset.txn.t.Iter(startPrefix, stopPrefix)
} else {
iter, err = zset.txn.t.IterReverse(stopPrefix)
}

if err != nil {
return err
}

for iter.Valid() && iter.Key().HasPrefix(memPrefix) {
if !callback(iter.Key(), iter.Value()) {
break
}
if err := iter.Next(); err != nil {
return err
}
}
return nil
}

func (zset *ZSet) ZRemRangeByLex(start, stop []byte, startInclude, stopInclude bool) (int64, error) {
var (
deleted int64 = 0
dkey []byte = DataKey(zset.txn.db, zset.meta.ID)
memPrefix []byte = zsetMemberKey(dkey, []byte{})
err error
)

f := func(key, val []byte) bool {
member := key[len(memPrefix):]
startCmp := kv.Key(member).Cmp(start)
stopCmp := kv.Key(member).Cmp(stop)
if len(stop) > 0 && stopCmp > 0 {
return false
}
if startCmp == 0 && !startInclude {
return true
}
scoreKey := zsetScoreKey(dkey, val, member)
if err = zset.txn.t.Delete(scoreKey); err != nil {
return false
}
if err = zset.txn.t.Delete(key); err != nil {
return false
}
deleted++
return true
}

if err = zset.ZAnyOrderRangeByLex(start, stop, stopInclude, true, f); err != nil {
return deleted, err
}
if zset.meta.Len == deleted {
mkey := MetaKey(zset.txn.db, zset.key)
err = zset.txn.t.Delete(mkey)
if err != nil {
return deleted, err
}
if zset.meta.Object.ExpireAt > 0 {
if err = unExpireAt(zset.txn.t, mkey, zset.meta.Object.ExpireAt); err != nil {
return deleted, err
}
}
}

return deleted, nil
}

func (zset *ZSet) ZCard() int64 {
return zset.meta.Len
}
Expand Down
14 changes: 7 additions & 7 deletions docs/command_list.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,27 @@
- [ ] bzpopmax
- [x] zadd
- [x] zcard
- [ ] zcount
- [x] zcount
- [ ] zincrby
- [ ] zinterstore
- [ ] zlexcount
- [x] zlexcount
- [ ] zpopmax
- [ ] zpopmin
- [x] zrange
- [ ] zrangebylex
- [ ] zrevrangebylex
- [x] zrangebylex
- [x] zrevrangebylex
- [x] zrangebyscore
- [ ] zrank
- [x] zrem
- [ ] zremrangebylex
- [x] zremrangebylex
- [ ] zremrangebyrank
- [ ] zremrangebyscore
- [x] zrevrange
- [ ] zrevrangebyscore
- [x] zrevrangebyscore
- [ ] zrevrank
- [x] zscore
- [ ] zunionstore
- [ ] zscan
- [x] zscan

### Geo

Expand Down
7 changes: 7 additions & 0 deletions tools/autotest/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (ac *AutoClient) ZSetCase(t *testing.T) {
ac.ez.ZScanEqual(t, "key-zset", "0", "*", 2, "member2 member4 -3.5 member5 0")
ac.ez.ZScanEqual(t, "key-zset", "member2", "member*", 2, "member11 member2 1.5 member1 2")

ac.ez.ZAddEqual(t, "key-zset-lex", "0", "a", "0", "aa", "0", "abc", "0", "apple", "0", "b", "0", "c", "0", "d", "0", "d1", "0", "dd", "0", "dobble", "0", "z", "0", "z1")
ac.ez.ZLexCountEqual(t, "key-zset-lex", "[a", "(abc", 2)
ac.ez.ZRangeByLexEqual(t, "key-zset-lex", "[a", "(abc", "LIMIT 0 2", "a aa")
ac.ez.ZRevRangeByLexEqual(t, "key-zset-lex", "(z", "[d1", "LIMIT 0 2", "dobble dd")
ac.ez.ZRemRangeByLexEqual(t, "key-zset-lex", 3, "(aa", "[b")
ac.ek.DelEqual(t, 1, "key-zset-lex")

ac.ez.ZRemEqual(t, "key-zset", "member2", "member1", "member3", "member4", "member1")
ac.ez.ZRangeEqual(t, "key-zset", 0, -1, true)

Expand Down
Loading

0 comments on commit 97868f5

Please sign in to comment.