Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruler: Add pagination for api/v1/rules #9563

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Alert struct {
// RuleDiscovery has info for all rules
type RuleDiscovery struct {
RuleGroups []*RuleGroup `json:"groups"`
NextToken string `json:"nextToken,omitempty"`
}

// RuleGroup has info for rules which are part of a group
Expand Down Expand Up @@ -172,12 +173,23 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
return
}

var maxGroups int
if maxGroupsVal := req.URL.Query().Get("max_groups"); maxGroupsVal != "" {
maxGroups, err = strconv.Atoi(maxGroupsVal)
if err != nil || maxGroups < 0 {
respondInvalidRequest(logger, w, "invalid max groups value")
return
}
}

rulesReq := RulesRequest{
Filter: AnyRule,
RuleName: req.URL.Query()["rule_name"],
RuleGroup: req.URL.Query()["rule_group"],
File: req.URL.Query()["file"],
ExcludeAlerts: excludeAlerts,
NextToken: req.URL.Query().Get("next_token"),
MaxGroups: int32(maxGroups),
}

ruleTypeFilter := strings.ToLower(req.URL.Query().Get("type"))
Expand All @@ -194,15 +206,14 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
}

w.Header().Set("Content-Type", "application/json")
rgs, err := a.ruler.GetRules(ctx, rulesReq)
rgs, token, err := a.ruler.GetRules(ctx, rulesReq)

if err != nil {
respondServerError(logger, w, err.Error())
return
}

groups := make([]*RuleGroup, 0, len(rgs))

for _, g := range rgs {
grp := RuleGroup{
Name: g.Group.Name,
Expand Down Expand Up @@ -251,6 +262,7 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
}
}
}

groups = append(groups, &grp)
}

Expand All @@ -261,7 +273,7 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {

b, err := json.Marshal(&response{
Status: "success",
Data: &RuleDiscovery{RuleGroups: groups},
Data: &RuleDiscovery{RuleGroups: groups, NextToken: token},
})
if err != nil {
level.Error(logger).Log("msg", "error marshaling json response", "err", err)
Expand All @@ -287,7 +299,6 @@ func parseExcludeAlerts(req *http.Request) (bool, error) {
}

return value, nil

}

func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {
Expand All @@ -302,7 +313,7 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {
}

w.Header().Set("Content-Type", "application/json")
rgs, err := a.ruler.GetRules(ctx, RulesRequest{Filter: AlertingRule})
rgs, _, err := a.ruler.GetRules(ctx, RulesRequest{Filter: AlertingRule})

if err != nil {
respondServerError(logger, w, err.Error())
Expand Down
144 changes: 142 additions & 2 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"slices"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -976,6 +979,143 @@ func TestRuler_PrometheusRules(t *testing.T) {
}
}

func TestRuler_PrometheusRulesPagination(t *testing.T) {
const (
userID = "user1"
interval = time.Minute
)

ruleGroups := rulespb.RuleGroupList{}
for ns := 0; ns < 3; ns++ {
for group := 0; group < 3; group++ {
g := &rulespb.RuleGroupDesc{
Name: fmt.Sprintf("test-group-%d", group),
Namespace: fmt.Sprintf("test-namespace-%d", ns),
User: userID,
Rules: []*rulespb.RuleDesc{
createAlertingRule("testalertingrule", "up < 1"),
},
Interval: interval,
}
ruleGroups = append(ruleGroups, g)
}
}

// Shuffle the rules before writing to check the sorting when they're returned
rand.Shuffle(len(ruleGroups), func(i, j int) { ruleGroups[i], ruleGroups[j] = ruleGroups[j], ruleGroups[i] })

cfg := defaultRulerConfig(t)
cfg.TenantFederation.Enabled = true

storageRules := map[string]rulespb.RuleGroupList{
userID: ruleGroups,
}

r := prepareRuler(t, cfg, newMockRuleStore(storageRules), withRulerAddrAutomaticMapping(), withLimits(validation.MockDefaultOverrides()), withStart())

// Sort them so we can compare against them
slices.SortFunc(ruleGroups, func(a, b *rulespb.RuleGroupDesc) int {
fileCompare := strings.Compare(a.Namespace, b.Namespace)

// If its 0, then the file names are the same.
// Lets look at the group names in that case.
if fileCompare != 0 {
return fileCompare
}
return strings.Compare(a.Name, b.Name)
})

// Rules will be synchronized asynchronously, so we wait until the expected number of rule groups
// has been synched.
test.Poll(t, 5*time.Second, len(ruleGroups), func() interface{} {
ctx := user.InjectOrgID(context.Background(), userID)
rls, _ := r.Rules(ctx, &RulesRequest{})
return len(rls.Groups)
})

a := NewAPI(r, r.store, log.NewNopLogger())

getRulesResponse := func(groupSize int, nextToken string) response {
queryParams := "?" + url.Values{
"max_groups": []string{strconv.Itoa(groupSize)},
"next_token": []string{nextToken},
}.Encode()
req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+queryParams, nil, userID)
w := httptest.NewRecorder()
a.PrometheusRules(w, req)

resp := w.Result()
body, _ := io.ReadAll(resp.Body)

r := response{}
err := json.Unmarshal(body, &r)
require.NoError(t, err)

return r
}

getRulesFromResponse := func(resp response) RuleDiscovery {
jsonRules, err := json.Marshal(resp.Data)
require.NoError(t, err)
returnedRules := RuleDiscovery{}
require.NoError(t, json.Unmarshal(jsonRules, &returnedRules))

return returnedRules
}

// No page size limit
resp := getRulesResponse(0, "")
require.Equal(t, "success", resp.Status)
rd := getRulesFromResponse(resp)
require.Len(t, rd.RuleGroups, len(ruleGroups))
require.Empty(t, rd.NextToken)

// We have 9 groups, keep fetching rules with a group page size of 2. The final
// page should have size 1 and an empty nextToken. Also check the groups are returned
// in order
var nextToken string
returnedRuleGroups := make([]*RuleGroup, 0, len(ruleGroups))
for i := 0; i < 4; i++ {
resp := getRulesResponse(2, nextToken)
require.Equal(t, "success", resp.Status)

rd := getRulesFromResponse(resp)
require.Len(t, rd.RuleGroups, 2)
require.NotEmpty(t, rd.NextToken)

returnedRuleGroups = append(returnedRuleGroups, rd.RuleGroups[0], rd.RuleGroups[1])
nextToken = rd.NextToken
}
resp = getRulesResponse(2, nextToken)
require.Equal(t, "success", resp.Status)

rd = getRulesFromResponse(resp)
require.Len(t, rd.RuleGroups, 1)
require.Empty(t, rd.NextToken)
returnedRuleGroups = append(returnedRuleGroups, rd.RuleGroups[0])

// Check the returned rules match the rules written
require.Equal(t, len(ruleGroups), len(returnedRuleGroups))
for i := 0; i < len(ruleGroups); i++ {
require.Equal(t, ruleGroups[i].Namespace, returnedRuleGroups[i].File)
require.Equal(t, ruleGroups[i].Name, returnedRuleGroups[i].Name)
require.Equal(t, len(ruleGroups[i].Rules), len(returnedRuleGroups[i].Rules))
for j := 0; j < len(ruleGroups[i].Rules); j++ {
jsonRule, err := json.Marshal(returnedRuleGroups[i].Rules[j])
require.NoError(t, err)
rule := alertingRule{}
require.NoError(t, json.Unmarshal(jsonRule, &rule))
require.Equal(t, ruleGroups[i].Rules[j].Alert, rule.Name)
}
}

// Invalid max groups value
resp = getRulesResponse(-1, "")
require.Equal(t, "error", resp.Status)
require.Equal(t, v1.ErrBadData, resp.ErrorType)
require.Equal(t, "invalid max groups value", resp.Error)
}

func TestRuler_PrometheusAlerts(t *testing.T) {
cfg := defaultRulerConfig(t)

Expand Down Expand Up @@ -1390,7 +1530,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) {

// Pre-condition check: the tenant should have 2 rule groups.
test.Poll(t, time.Second, 2, func() interface{} {
actualRuleGroups, err := r.GetRules(user.InjectOrgID(context.Background(), userID), RulesRequest{Filter: AnyRule})
actualRuleGroups, _, err := r.GetRules(user.InjectOrgID(context.Background(), userID), RulesRequest{Filter: AnyRule})
require.NoError(t, err)
return len(actualRuleGroups)
})
Expand All @@ -1408,7 +1548,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) {

// Ensure the rule group has been deleted.
test.Poll(t, time.Second, 1, func() interface{} {
actualRuleGroups, err := r.GetRules(user.InjectOrgID(context.Background(), userID), RulesRequest{Filter: AnyRule})
actualRuleGroups, _, err := r.GetRules(user.InjectOrgID(context.Background(), userID), RulesRequest{Filter: AnyRule})
require.NoError(t, err)
return len(actualRuleGroups)
})
Expand Down
62 changes: 56 additions & 6 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ package ruler

import (
"context"
"encoding/base64"
"flag"
"fmt"
"hash/fnv"
"net/http"
"net/url"
"path/filepath"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -951,10 +953,10 @@ func filterRuleGroupsByNotMissing(configs map[string]rulespb.RuleGroupList, miss
}

// GetRules retrieves the running rules from this ruler and all running rulers in the ring.
func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDesc, error) {
func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDesc, string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if function could return RulesResponse, just so we add the token to that, just so it's named.

Not sure how invasive that is, so it could be done in a separate or follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, but decided against this for now to prevent an API level struct being introduced to this function.

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, fmt.Errorf("no user id found in context")
return nil, "", fmt.Errorf("no user id found in context")
}

rr := ring.ReadRing(r.ring)
Expand All @@ -965,7 +967,7 @@ func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDe

ctx, err = user.InjectIntoGRPCRequest(ctx)
if err != nil {
return nil, fmt.Errorf("unable to inject user ID into grpc request, %v", err)
return nil, "", fmt.Errorf("unable to inject user ID into grpc request, %v", err)
}

var (
Expand Down Expand Up @@ -993,7 +995,31 @@ func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDe
return nil
})

return merged, err
// If the request asks for pagination, we fetch req.MaxGroups number
// of rule groups from each replica. We then merge and sort these and
// take the top k (k = MaxGroups)
if req.MaxGroups > 0 {
slices.SortFunc(merged, func(a, b *GroupStateDesc) int {
nsCmp := strings.Compare(a.Group.Namespace, b.Group.Namespace)
if nsCmp != 0 {
return nsCmp
}

// If Namespaces are equal, check the group names
return strings.Compare(a.Group.Name, b.Group.Name)
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking nit: this might be over-engineering..... but I see a potential issue where someone updates the sort strategy in one place, and not the other (in api.go). Could we share them somehow? Define a "SortableGroup" interface or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the sorting in api.go - the groups will now get sorted after they're merged in GetRules()


if len(merged) > int(req.MaxGroups) {
groupForToken := merged[req.MaxGroups]
return merged[:req.MaxGroups], getRuleGroupNextToken(groupForToken.Group.Namespace, groupForToken.Group.Name), err
}

// If len(merged) <= req.MaxGroups we are
// on the last page so there is no token to return
return merged, "", err
}

return merged, "", err
}

// SyncRules implements the gRPC Ruler service.
Expand Down Expand Up @@ -1068,18 +1094,27 @@ func (r *Ruler) getLocalRules(ctx context.Context, userID string, req RulesReque
groupSet := makeStringFilterSet(req.RuleGroup)
ruleSet := makeStringFilterSet(req.RuleName)

foundToken := false
for _, group := range groups {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rules are sorted at this point, right? From GetRules?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it's done here:

slices.SortFunc(rgs, func(a, b *Group) int {

if groupSet.IsFiltered(group.Name()) {
continue
}

interval := group.Interval()

// The mapped filename is url path escaped encoded to make handling `/` characters easier
decodedNamespace, err := url.PathUnescape(strings.TrimPrefix(group.File(), prefix))
if err != nil {
return nil, errors.Wrap(err, "unable to decode rule filename")
}

if req.NextToken != "" && !foundToken {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if req.NextToken != "" && !foundToken {
// If a pagination token is provided, skip past groups until we reach the point of that token.
if req.NextToken != "" && !foundToken {

if !tokenGreaterThanOrEqual(getRuleGroupNextToken(decodedNamespace, group.Name()), req.NextToken) {
continue
}
foundToken = true
}

interval := group.Interval()

if fileSet.IsFiltered(decodedNamespace) {
continue
}
Expand Down Expand Up @@ -1171,12 +1206,27 @@ func (r *Ruler) getLocalRules(ctx context.Context, userID string, req RulesReque

// Prometheus does not return a rule group if it has no rules after filtering.
if len(groupDesc.ActiveRules) > 0 {
if req.MaxGroups > 0 && len(groupDescs) == int(req.MaxGroups)+1 {
break
}
groupDescs = append(groupDescs, groupDesc)
}
}
return groupDescs, nil
}

func getRuleGroupNextToken(file, group string) string {
return base64.URLEncoding.EncodeToString([]byte(file + "/" + group))
}

// Returns true if tokenA >= tokenB
func tokenGreaterThanOrEqual(tokenA string, tokenB string) bool {
decodedTokenA, _ := base64.URLEncoding.DecodeString(tokenA)
decodedTokenB, _ := base64.URLEncoding.DecodeString(tokenB)

return string(decodedTokenA) >= string(decodedTokenB)
}

// IsMaxRuleGroupsLimited returns true if there is a limit set for the max
// number of rule groups for the tenant and namespace.
func (r *Ruler) IsMaxRuleGroupsLimited(userID, namespace string) bool {
Expand Down
Loading
Loading