Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ByScore, Limit for ZRANGE #1396

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/eval/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ const (
FILTERS string = "FILTER"
ITEMS string = "ITEMS"
EXPANSION string = "EXPANSION"
BYSCORE string = "BYSCORE"
BYLEX string = "BYLEX"
LIMIT string = "LIMIT"
PLUSINF string = "+inf"
MINUSINF string = "-inf"
)
49 changes: 49 additions & 0 deletions internal/eval/sortedset/sorted_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,52 @@ func DeserializeSortedSet(buf *bytes.Reader) (*Set, error) {

return ss, nil
}

// GetRangeByScore returns a slice of members with scores between min and max, inclusive.
func (ss *Set) GetRangeByScore(minScore, maxScore float64, withScores, reverse bool, offset, count int) []string {
var result []string
index := 0
returned := 0

iterFunc := func(item btree.Item) bool {
ssi := item.(*Item)

if reverse {
if ssi.Score > maxScore {
return true
}
if ssi.Score < minScore {
return false
}
} else {
if ssi.Score < minScore {
return true
}
if ssi.Score > maxScore {
return false
}
}

if index >= offset {
result = append(result, ssi.Member)
if withScores {
scoreStr := strings.ToLower(strconv.FormatFloat(ssi.Score, 'g', -1, 64))
result = append(result, scoreStr)
}
returned++
if count > 0 && returned >= count {
return false
}
}
index++
return true
}

if reverse {
ss.tree.Descend(iterFunc)
} else {
ss.tree.Ascend(iterFunc)
}

return result
}
247 changes: 194 additions & 53 deletions internal/eval/store_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,71 +966,212 @@ func evalZCOUNT(args []string, store *dstore.Store) *EvalResponse {
// The elements are considered to be ordered from the lowest to the highest score.
func evalZRANGE(args []string, store *dstore.Store) *EvalResponse {
if len(args) < 3 {
return &EvalResponse{
Result: nil,
Error: diceerrors.ErrWrongArgumentCount("ZRANGE"),
return &EvalResponse{Result: nil, Error: diceerrors.ErrWrongArgumentCount("ZRANGE")}
}

opts := parseOptions(args[3:])
if opts.error != nil {
return &EvalResponse{Result: nil, Error: opts.error}
}

if opts.byScore && opts.byLex {
return &EvalResponse{Result: nil, Error: diceerrors.ErrSyntax}
}

if opts.limit && !opts.byScore && !opts.byLex {
return &EvalResponse{Result: nil, Error: diceerrors.NewErr("ERR syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX")}
}

startStr, stopStr := args[1], args[2]
if !opts.byScore {
if err := validateRange(startStr, stopStr, opts.byScore, opts.byLex); err != nil {
return &EvalResponse{Result: nil, Error: err}
}
} else {
_, _, err := parseScoreRange(startStr, stopStr)
if err != nil {
return &EvalResponse{Result: nil, Error: err}
}
}

key := args[0]
startStr := args[1]
stopStr := args[2]
obj := store.Get(args[0])
if obj == nil {
return &EvalResponse{Result: []string{}, Error: nil}
}

withScores := false
reverse := false
for i := 3; i < len(args); i++ {
arg := strings.ToUpper(args[i])
if arg == WithScores {
withScores = true
} else if arg == REV {
reverse = true
} else {
return &EvalResponse{
Result: nil,
Error: diceerrors.ErrSyntax,
sortedSet, errMsg := sortedset.FromObject(obj)
if errMsg != nil {
return &EvalResponse{Result: nil, Error: diceerrors.ErrWrongTypeOperation}
}

result := getRange(sortedSet, startStr, stopStr, opts)

if opts.limit && !opts.byScore && !opts.byLex && opts.offset >= 0 {
if opts.offset < len(result) {
end := opts.offset + opts.count
if end > len(result) || opts.count < 0 {
end = len(result)
}
result = result[opts.offset:end]
} else {
result = []string{}
}
}

start, err := strconv.Atoi(startStr)
if err != nil {
return &EvalResponse{
Result: nil,
Error: diceerrors.ErrInvalidNumberFormat,
return &EvalResponse{Result: result, Error: nil}
}

type rangeOptions struct {
withScores bool
reverse bool
byScore bool
byLex bool
limit bool
offset int
count int
error error
}

func parseOptions(args []string) rangeOptions {
opts := rangeOptions{count: -1}

for i := 0; i < len(args); i++ {
switch strings.ToUpper(args[i]) {
case WithScores:
opts.withScores = true
case REV:
opts.reverse = true
case BYSCORE:
opts.byScore = true
case BYLEX:
opts.byLex = true
case LIMIT:
if i+2 >= len(args) {
opts.error = diceerrors.ErrSyntax
return opts
}

var err error
opts.offset, err = strconv.Atoi(args[i+1])
if err != nil {
opts.error = diceerrors.ErrIntegerOutOfRange
return opts
}

opts.count, err = strconv.Atoi(args[i+2])
if err != nil {
opts.error = diceerrors.ErrIntegerOutOfRange
return opts
}

opts.limit = true
i += 2
default:
opts.error = diceerrors.ErrSyntax
return opts
}
}

stop, err := strconv.Atoi(stopStr)
if err != nil {
return &EvalResponse{
Result: nil,
Error: diceerrors.ErrInvalidNumberFormat,
}
return opts
}

func validateRange(start, stop string, byScore, byLex bool) error {
if !byScore && !isValidRangeFormat(start, stop) {
return diceerrors.ErrIntegerOutOfRange
}

obj := store.Get(key)
if obj == nil {
return &EvalResponse{
Result: []string{},
Error: nil,
if !byLex {
if !isValidNumber(stripPrefix(start)) || !isValidNumber(stripPrefix(stop)) {
return diceerrors.ErrIntegerOutOfRange
}
}

sortedSet, errMsg := sortedset.FromObject(obj)
return nil
}

if errMsg != nil {
return &EvalResponse{
Result: nil,
Error: diceerrors.ErrWrongTypeOperation,
func isValidRangeFormat(start, stop string) bool {
return isSpecialRange(start) && isSpecialRange(stop)
}

func isSpecialRange(s string) bool {
return strings.HasPrefix(s, "(") || strings.HasPrefix(s, "[") ||
s == "-" || s == "+" || s == MINUSINF || s == PLUSINF ||
!(len(s) > 1 && s[0] == '0')
}

func stripPrefix(s string) string {
if strings.HasPrefix(s, "(") || strings.HasPrefix(s, "[") {
return s[1:]
}
return s
}

func isValidNumber(s string) bool {
if s == PLUSINF || s == MINUSINF {
return true
}
_, err := strconv.ParseInt(s, 10, 64)
return err == nil
}

func getRange(set *sortedset.Set, start, stop string, opts rangeOptions) []string {
if opts.byScore {
if opts.reverse {
start, stop = stop, start
}
rangeStart, rangeStop, err := parseScoreRange(start, stop)
if err != nil {
return []string{}
}
return set.GetRangeByScore(rangeStart, rangeStop, opts.withScores, opts.reverse, opts.offset, opts.count)
}

result := sortedSet.GetRange(start, stop, withScores, reverse)
startIdx, err := strconv.Atoi(start)
if err != nil {
return []string{}
}
stopIdx, err := strconv.Atoi(stop)
if err != nil {
return []string{}
}
return set.GetRange(startIdx, stopIdx, opts.withScores, opts.reverse)
}

return &EvalResponse{
Result: result,
Error: nil,
func parseScoreRange(minStr, maxStr string) (minScore, maxScore float64, err error) {
minScore = math.Inf(-1)
maxScore = math.Inf(1)

if minStr != MINUSINF {
if minStr[0] == '(' {
minScore, err = strconv.ParseFloat(minStr[1:], 64)
if err != nil {
return 0, 0, fmt.Errorf("ERR min or max is not a float")
}
minScore = math.Nextafter(minScore, math.Inf(1))
} else {
minScore, err = strconv.ParseFloat(minStr, 64)
if err != nil {
return 0, 0, fmt.Errorf("ERR min or max is not a float")
}
}
}

if maxStr != PLUSINF {
if maxStr[0] == '(' {
maxScore, err = strconv.ParseFloat(maxStr[1:], 64)
if err != nil {
return 0, 0, fmt.Errorf("ERR min or max is not a float")
}
maxScore = math.Nextafter(maxScore, math.Inf(-1))
} else {
maxScore, err = strconv.ParseFloat(maxStr, 64)
if err != nil {
return 0, 0, fmt.Errorf("ERR min or max is not a float")
}
}
}

return minScore, maxScore, nil
}

// evalZREM removes the specified members from the sorted set stored at key.
Expand Down Expand Up @@ -6959,9 +7100,9 @@ func evalJSONARRINDEX(args []string, store *dstore.Store) *EvalResponse {

adjustedStart, adjustedStop := adjustIndices(start, stop, length)

if adjustedStart == -1 {
arrIndexList = append(arrIndexList, -1)
continue
if adjustedStart == -1 {
arrIndexList = append(arrIndexList, -1)
continue
}

// Range [start, stop) : start is inclusive, stop is exclusive
Expand All @@ -6984,18 +7125,18 @@ func evalJSONARRINDEX(args []string, store *dstore.Store) *EvalResponse {
return makeEvalResult(arrIndexList)
}

// adjustIndices adjusts the start and stop indices for array traversal.
// It handles negative indices and ensures they are within the array bounds.
func adjustIndices(start, stop, length int) (adjustedStart, adjustedStop int) {
// adjustIndices adjusts the start and stop indices for array traversal.
// It handles negative indices and ensures they are within the array bounds.
func adjustIndices(start, stop, length int) (adjustedStart, adjustedStop int) {
if length == 0 {
return -1, -1
return -1, -1
}
if start < 0 {
start += length
start += length
}

if stop <= 0 {
stop += length
if stop <= 0 {
stop += length
}
if start < 0 {
start = 0
Expand Down
Loading