From baeb8d9bb3e29cfe5524371797348e789ca494ae Mon Sep 17 00:00:00 2001 From: fayzal-g Date: Wed, 9 Oct 2024 11:21:05 +0100 Subject: [PATCH 1/3] Changes in single commit --- pkg/ruler/api.go | 21 +++- pkg/ruler/api_test.go | 144 ++++++++++++++++++++++++++- pkg/ruler/ruler.go | 62 ++++++++++-- pkg/ruler/ruler.pb.go | 214 +++++++++++++++++++++++++++++----------- pkg/ruler/ruler.proto | 2 + pkg/ruler/ruler_test.go | 16 +-- 6 files changed, 380 insertions(+), 79 deletions(-) diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 1d205c421b2..a55924ce3a6 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -64,6 +64,7 @@ type Alert struct { // RuleDiscovery has info for all rules type RuleDiscovery struct { RuleGroups []*RuleGroup `json:"groups"` + NextToken string `json:"nextToken,omitempty"` } // RuleGroup has info for rules which are part of a group @@ -172,12 +173,23 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { return } + var maxGroups int + if maxGroupsVal := req.URL.Query().Get("max_groups"); maxGroupsVal != "" { + maxGroups, err = strconv.Atoi(maxGroupsVal) + if err != nil || maxGroups < 0 { + respondInvalidRequest(logger, w, "invalid max groups value") + return + } + } + rulesReq := RulesRequest{ Filter: AnyRule, RuleName: req.URL.Query()["rule_name"], RuleGroup: req.URL.Query()["rule_group"], File: req.URL.Query()["file"], ExcludeAlerts: excludeAlerts, + NextToken: req.URL.Query().Get("next_token"), + MaxGroups: int32(maxGroups), } ruleTypeFilter := strings.ToLower(req.URL.Query().Get("type")) @@ -194,7 +206,7 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { } w.Header().Set("Content-Type", "application/json") - rgs, err := a.ruler.GetRules(ctx, rulesReq) + rgs, token, err := a.ruler.GetRules(ctx, rulesReq) if err != nil { respondServerError(logger, w, err.Error()) @@ -202,7 +214,6 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { } groups := make([]*RuleGroup, 0, len(rgs)) - for _, g := range rgs { grp := RuleGroup{ Name: g.Group.Name, @@ -251,6 +262,7 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { } } } + groups = append(groups, &grp) } @@ -261,7 +273,7 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { b, err := json.Marshal(&response{ Status: "success", - Data: &RuleDiscovery{RuleGroups: groups}, + Data: &RuleDiscovery{RuleGroups: groups, NextToken: token}, }) if err != nil { level.Error(logger).Log("msg", "error marshaling json response", "err", err) @@ -287,7 +299,6 @@ func parseExcludeAlerts(req *http.Request) (bool, error) { } return value, nil - } func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) { @@ -302,7 +313,7 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) { } w.Header().Set("Content-Type", "application/json") - rgs, err := a.ruler.GetRules(ctx, RulesRequest{Filter: AlertingRule}) + rgs, _, err := a.ruler.GetRules(ctx, RulesRequest{Filter: AlertingRule}) if err != nil { respondServerError(logger, w, err.Error()) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index d445b32f7ef..7e7ddb36b5b 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -11,9 +11,12 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" "net/http/httptest" "net/url" + "slices" + "strconv" "strings" "testing" "time" @@ -976,6 +979,143 @@ func TestRuler_PrometheusRules(t *testing.T) { } } +func TestRuler_PrometheusRulesPagination(t *testing.T) { + const ( + userID = "user1" + interval = time.Minute + ) + + ruleGroups := rulespb.RuleGroupList{} + for ns := 0; ns < 3; ns++ { + for group := 0; group < 3; group++ { + g := &rulespb.RuleGroupDesc{ + Name: fmt.Sprintf("test-group-%d", group), + Namespace: fmt.Sprintf("test-namespace-%d", ns), + User: userID, + Rules: []*rulespb.RuleDesc{ + createAlertingRule("testalertingrule", "up < 1"), + }, + Interval: interval, + } + ruleGroups = append(ruleGroups, g) + } + } + + // Shuffle the rules before writing to check the sorting when they're returned + rand.Shuffle(len(ruleGroups), func(i, j int) { ruleGroups[i], ruleGroups[j] = ruleGroups[j], ruleGroups[i] }) + + cfg := defaultRulerConfig(t) + cfg.TenantFederation.Enabled = true + + storageRules := map[string]rulespb.RuleGroupList{ + userID: ruleGroups, + } + + r := prepareRuler(t, cfg, newMockRuleStore(storageRules), withRulerAddrAutomaticMapping(), withLimits(validation.MockDefaultOverrides()), withStart()) + + // Sort them so we can compare against them + slices.SortFunc(ruleGroups, func(a, b *rulespb.RuleGroupDesc) int { + fileCompare := strings.Compare(a.Namespace, b.Namespace) + + // If its 0, then the file names are the same. + // Lets look at the group names in that case. + if fileCompare != 0 { + return fileCompare + } + return strings.Compare(a.Name, b.Name) + }) + + // Rules will be synchronized asynchronously, so we wait until the expected number of rule groups + // has been synched. + test.Poll(t, 5*time.Second, len(ruleGroups), func() interface{} { + ctx := user.InjectOrgID(context.Background(), userID) + rls, _ := r.Rules(ctx, &RulesRequest{}) + return len(rls.Groups) + }) + + a := NewAPI(r, r.store, log.NewNopLogger()) + + getRulesResponse := func(groupSize int, nextToken string) response { + queryParams := "?" + url.Values{ + "max_groups": []string{strconv.Itoa(groupSize)}, + "next_token": []string{nextToken}, + }.Encode() + req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+queryParams, nil, userID) + w := httptest.NewRecorder() + a.PrometheusRules(w, req) + + resp := w.Result() + body, _ := io.ReadAll(resp.Body) + + r := response{} + err := json.Unmarshal(body, &r) + require.NoError(t, err) + + return r + } + + getRulesFromResponse := func(resp response) RuleDiscovery { + jsonRules, err := json.Marshal(resp.Data) + require.NoError(t, err) + returnedRules := RuleDiscovery{} + require.NoError(t, json.Unmarshal(jsonRules, &returnedRules)) + + return returnedRules + } + + // No page size limit + resp := getRulesResponse(0, "") + require.Equal(t, "success", resp.Status) + rd := getRulesFromResponse(resp) + require.Len(t, rd.RuleGroups, len(ruleGroups)) + require.Empty(t, rd.NextToken) + + // We have 9 groups, keep fetching rules with a group page size of 2. The final + // page should have size 1 and an empty nextToken. Also check the groups are returned + // in order + var nextToken string + returnedRuleGroups := make([]*RuleGroup, 0, len(ruleGroups)) + for i := 0; i < 4; i++ { + resp := getRulesResponse(2, nextToken) + require.Equal(t, "success", resp.Status) + + rd := getRulesFromResponse(resp) + require.Len(t, rd.RuleGroups, 2) + require.NotEmpty(t, rd.NextToken) + + returnedRuleGroups = append(returnedRuleGroups, rd.RuleGroups[0], rd.RuleGroups[1]) + nextToken = rd.NextToken + } + resp = getRulesResponse(2, nextToken) + require.Equal(t, "success", resp.Status) + + rd = getRulesFromResponse(resp) + require.Len(t, rd.RuleGroups, 1) + require.Empty(t, rd.NextToken) + returnedRuleGroups = append(returnedRuleGroups, rd.RuleGroups[0]) + + // Check the returned rules match the rules written + require.Equal(t, len(ruleGroups), len(returnedRuleGroups)) + for i := 0; i < len(ruleGroups); i++ { + require.Equal(t, ruleGroups[i].Namespace, returnedRuleGroups[i].File) + require.Equal(t, ruleGroups[i].Name, returnedRuleGroups[i].Name) + require.Equal(t, len(ruleGroups[i].Rules), len(returnedRuleGroups[i].Rules)) + for j := 0; j < len(ruleGroups[i].Rules); j++ { + jsonRule, err := json.Marshal(returnedRuleGroups[i].Rules[j]) + require.NoError(t, err) + rule := alertingRule{} + require.NoError(t, json.Unmarshal(jsonRule, &rule)) + require.Equal(t, ruleGroups[i].Rules[j].Alert, rule.Name) + } + } + + // Invalid max groups value + resp = getRulesResponse(-1, "") + require.Equal(t, "error", resp.Status) + require.Equal(t, v1.ErrBadData, resp.ErrorType) + require.Equal(t, "invalid max groups value", resp.Error) +} + func TestRuler_PrometheusAlerts(t *testing.T) { cfg := defaultRulerConfig(t) @@ -1390,7 +1530,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) { // Pre-condition check: the tenant should have 2 rule groups. test.Poll(t, time.Second, 2, func() interface{} { - actualRuleGroups, err := r.GetRules(user.InjectOrgID(context.Background(), userID), RulesRequest{Filter: AnyRule}) + actualRuleGroups, _, err := r.GetRules(user.InjectOrgID(context.Background(), userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) return len(actualRuleGroups) }) @@ -1408,7 +1548,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) { // Ensure the rule group has been deleted. test.Poll(t, time.Second, 1, func() interface{} { - actualRuleGroups, err := r.GetRules(user.InjectOrgID(context.Background(), userID), RulesRequest{Filter: AnyRule}) + actualRuleGroups, _, err := r.GetRules(user.InjectOrgID(context.Background(), userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) return len(actualRuleGroups) }) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 9c8cddf1688..bd4d4907058 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -7,12 +7,14 @@ package ruler import ( "context" + "encoding/base64" "flag" "fmt" "hash/fnv" "net/http" "net/url" "path/filepath" + "slices" "strings" "sync" "time" @@ -951,10 +953,10 @@ func filterRuleGroupsByNotMissing(configs map[string]rulespb.RuleGroupList, miss } // GetRules retrieves the running rules from this ruler and all running rulers in the ring. -func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDesc, error) { +func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDesc, string, error) { userID, err := tenant.TenantID(ctx) if err != nil { - return nil, fmt.Errorf("no user id found in context") + return nil, "", fmt.Errorf("no user id found in context") } rr := ring.ReadRing(r.ring) @@ -965,7 +967,7 @@ func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDe ctx, err = user.InjectIntoGRPCRequest(ctx) if err != nil { - return nil, fmt.Errorf("unable to inject user ID into grpc request, %v", err) + return nil, "", fmt.Errorf("unable to inject user ID into grpc request, %v", err) } var ( @@ -993,7 +995,31 @@ func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDe return nil }) - return merged, err + // If the request asks for pagination, we fetch req.MaxGroups number + // of rule groups from each replica. We then merge and sort these and + // take the top k (k = MaxGroups) + if req.MaxGroups > 0 { + slices.SortFunc(merged, func(a, b *GroupStateDesc) int { + nsCmp := strings.Compare(a.Group.Namespace, b.Group.Namespace) + if nsCmp != 0 { + return nsCmp + } + + // If Namespaces are equal, check the group names + return strings.Compare(a.Group.Name, b.Group.Name) + }) + + if len(merged) > int(req.MaxGroups) { + groupForToken := merged[req.MaxGroups] + return merged[:req.MaxGroups], getRuleGroupNextToken(groupForToken.Group.Namespace, groupForToken.Group.Name), err + } + + // If len(merged) <= req.MaxGroups we are + // on the last page so there is no token to return + return merged, "", err + } + + return merged, "", err } // SyncRules implements the gRPC Ruler service. @@ -1068,18 +1094,27 @@ func (r *Ruler) getLocalRules(ctx context.Context, userID string, req RulesReque groupSet := makeStringFilterSet(req.RuleGroup) ruleSet := makeStringFilterSet(req.RuleName) + foundToken := false for _, group := range groups { if groupSet.IsFiltered(group.Name()) { continue } - interval := group.Interval() - // The mapped filename is url path escaped encoded to make handling `/` characters easier decodedNamespace, err := url.PathUnescape(strings.TrimPrefix(group.File(), prefix)) if err != nil { return nil, errors.Wrap(err, "unable to decode rule filename") } + + if req.NextToken != "" && !foundToken { + if !tokenGreaterThanOrEqual(getRuleGroupNextToken(decodedNamespace, group.Name()), req.NextToken) { + continue + } + foundToken = true + } + + interval := group.Interval() + if fileSet.IsFiltered(decodedNamespace) { continue } @@ -1171,12 +1206,27 @@ func (r *Ruler) getLocalRules(ctx context.Context, userID string, req RulesReque // Prometheus does not return a rule group if it has no rules after filtering. if len(groupDesc.ActiveRules) > 0 { + if req.MaxGroups > 0 && len(groupDescs) == int(req.MaxGroups)+1 { + break + } groupDescs = append(groupDescs, groupDesc) } } return groupDescs, nil } +func getRuleGroupNextToken(file, group string) string { + return base64.URLEncoding.EncodeToString([]byte(file + "/" + group)) +} + +// Returns true if tokenA >= tokenB +func tokenGreaterThanOrEqual(tokenA string, tokenB string) bool { + decodedTokenA, _ := base64.URLEncoding.DecodeString(tokenA) + decodedTokenB, _ := base64.URLEncoding.DecodeString(tokenB) + + return string(decodedTokenA) >= string(decodedTokenB) +} + // IsMaxRuleGroupsLimited returns true if there is a limit set for the max // number of rule groups for the tenant and namespace. func (r *Ruler) IsMaxRuleGroupsLimited(userID, namespace string) bool { diff --git a/pkg/ruler/ruler.pb.go b/pkg/ruler/ruler.pb.go index 59d3f8df60b..73d87f439ec 100644 --- a/pkg/ruler/ruler.pb.go +++ b/pkg/ruler/ruler.pb.go @@ -69,6 +69,8 @@ type RulesRequest struct { RuleGroup []string `protobuf:"bytes,3,rep,name=rule_group,json=ruleGroup,proto3" json:"rule_group,omitempty"` File []string `protobuf:"bytes,4,rep,name=file,proto3" json:"file,omitempty"` ExcludeAlerts bool `protobuf:"varint,5,opt,name=exclude_alerts,json=excludeAlerts,proto3" json:"exclude_alerts,omitempty"` + MaxGroups int32 `protobuf:"varint,6,opt,name=max_groups,json=maxGroups,proto3" json:"max_groups,omitempty"` + NextToken string `protobuf:"bytes,7,opt,name=next_token,json=nextToken,proto3" json:"next_token,omitempty"` } func (m *RulesRequest) Reset() { *m = RulesRequest{} } @@ -138,6 +140,20 @@ func (m *RulesRequest) GetExcludeAlerts() bool { return false } +func (m *RulesRequest) GetMaxGroups() int32 { + if m != nil { + return m.MaxGroups + } + return 0 +} + +func (m *RulesRequest) GetNextToken() string { + if m != nil { + return m.NextToken + } + return "" +} + type RulesResponse struct { Groups []*GroupStateDesc `protobuf:"bytes,1,rep,name=groups,proto3" json:"groups,omitempty"` } @@ -537,63 +553,65 @@ func init() { func init() { proto.RegisterFile("ruler.proto", fileDescriptor_9ecbec0a4cfddea6) } var fileDescriptor_9ecbec0a4cfddea6 = []byte{ - // 891 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xcf, 0x6f, 0x1b, 0x45, - 0x14, 0xde, 0x71, 0xe2, 0x1f, 0xfb, 0x9c, 0xa4, 0xc9, 0x24, 0xc0, 0xd6, 0x94, 0x8d, 0x65, 0x84, - 0x64, 0x21, 0x65, 0x03, 0x21, 0x02, 0x21, 0x21, 0xc0, 0x51, 0x5b, 0x84, 0x84, 0x50, 0xb5, 0x2e, - 0x5c, 0xad, 0xb1, 0x3d, 0xde, 0x8c, 0xba, 0xde, 0x5d, 0x66, 0x66, 0xa3, 0xe6, 0x04, 0x7f, 0x42, - 0x8f, 0x9c, 0x39, 0xf1, 0x77, 0x70, 0xea, 0x31, 0xc7, 0x0a, 0xa1, 0x42, 0x9c, 0x0b, 0xc7, 0x5e, - 0xb9, 0xa1, 0x79, 0xb3, 0x1b, 0x3b, 0x6d, 0x40, 0xb5, 0xa0, 0x97, 0x78, 0xde, 0x7b, 0xdf, 0xf7, - 0xcd, 0xfb, 0xb5, 0x13, 0x68, 0xca, 0x3c, 0xe6, 0x32, 0xc8, 0x64, 0xaa, 0x53, 0x5a, 0x45, 0xa3, - 0xb5, 0x17, 0x09, 0x7d, 0x9c, 0x0f, 0x83, 0x51, 0x3a, 0xdd, 0x8f, 0xd2, 0x28, 0xdd, 0xc7, 0xe8, - 0x30, 0x9f, 0xa0, 0x85, 0x06, 0x9e, 0x2c, 0xab, 0xe5, 0x47, 0x69, 0x1a, 0xc5, 0x7c, 0x8e, 0x1a, - 0xe7, 0x92, 0x69, 0x91, 0x26, 0x45, 0x7c, 0xf7, 0xf9, 0xb8, 0x16, 0x53, 0xae, 0x34, 0x9b, 0x66, - 0x05, 0xe0, 0xbd, 0xc5, 0xfb, 0x24, 0x9b, 0xb0, 0x84, 0xed, 0x4f, 0xc5, 0x54, 0xc8, 0xfd, 0xec, - 0x41, 0x64, 0x4f, 0xd9, 0xd0, 0xfe, 0x16, 0x8c, 0x0f, 0xff, 0x95, 0x81, 0x55, 0xe0, 0x5f, 0x95, - 0x0d, 0xed, 0xaf, 0xe5, 0x75, 0xfe, 0x22, 0xb0, 0x16, 0x1a, 0x3b, 0xe4, 0xdf, 0xe5, 0x5c, 0x69, - 0x7a, 0x08, 0xb5, 0x89, 0x88, 0x35, 0x97, 0x1e, 0x69, 0x93, 0xee, 0xc6, 0xc1, 0xad, 0xc0, 0xf6, - 0x63, 0x11, 0x84, 0xc6, 0xfd, 0xd3, 0x8c, 0x87, 0x05, 0x96, 0xbe, 0x09, 0xae, 0x81, 0x0d, 0x12, - 0x36, 0xe5, 0x5e, 0xa5, 0xbd, 0xd2, 0x75, 0xc3, 0x86, 0x71, 0x7c, 0xcd, 0xa6, 0x9c, 0xbe, 0x05, - 0x80, 0xc1, 0x48, 0xa6, 0x79, 0xe6, 0xad, 0x60, 0x14, 0xe1, 0x5f, 0x18, 0x07, 0xa5, 0xb0, 0x3a, - 0x11, 0x31, 0xf7, 0x56, 0x31, 0x80, 0x67, 0xfa, 0x0e, 0x6c, 0xf0, 0x87, 0xa3, 0x38, 0x1f, 0xf3, - 0x01, 0x8b, 0xb9, 0xd4, 0xca, 0xab, 0xb6, 0x49, 0xb7, 0x11, 0xae, 0x17, 0xde, 0x1e, 0x3a, 0x3b, - 0x9f, 0x40, 0xa3, 0x4c, 0x85, 0x36, 0xa1, 0xde, 0x4b, 0x4e, 0x8d, 0xb9, 0xe9, 0xd0, 0x4d, 0x58, - 0x43, 0x88, 0x48, 0x22, 0xf4, 0x10, 0xba, 0x05, 0xeb, 0x21, 0x1f, 0xa5, 0x72, 0x5c, 0xba, 0x2a, - 0x9d, 0x4f, 0x61, 0xbd, 0xa8, 0x4a, 0x65, 0x69, 0xa2, 0x38, 0xdd, 0x83, 0x1a, 0xe6, 0xa8, 0x3c, - 0xd2, 0x5e, 0xe9, 0x36, 0x0f, 0x5e, 0x2b, 0x6a, 0xc7, 0x3c, 0xfb, 0x9a, 0x69, 0x7e, 0x9b, 0xab, - 0x51, 0x58, 0x80, 0x3a, 0x7b, 0xb0, 0xd9, 0x3f, 0x4d, 0x46, 0x57, 0xda, 0x77, 0x13, 0x1a, 0xb9, - 0xe2, 0x72, 0x20, 0xc6, 0x56, 0xc4, 0x0d, 0xeb, 0xc6, 0xfe, 0x72, 0xac, 0x3a, 0xdb, 0xb0, 0xb5, - 0x00, 0xb7, 0x57, 0x76, 0x7e, 0xaa, 0xc0, 0xc6, 0x55, 0x79, 0xfa, 0x2e, 0x54, 0x6d, 0xa7, 0xcc, - 0x00, 0x9a, 0x07, 0x3b, 0x81, 0x9d, 0x57, 0x58, 0x36, 0x0c, 0x73, 0xb0, 0x10, 0xfa, 0x11, 0xac, - 0xb1, 0x91, 0x16, 0x27, 0x7c, 0x80, 0x20, 0x6c, 0x7d, 0x49, 0xb1, 0x33, 0x9b, 0xa7, 0xdd, 0xb4, - 0x48, 0xbc, 0x9f, 0x7e, 0x0b, 0xdb, 0xfc, 0x84, 0xc5, 0x39, 0xae, 0xe5, 0xfd, 0x72, 0xfd, 0xbc, - 0x15, 0xbc, 0xb2, 0x15, 0xd8, 0x05, 0x0d, 0xca, 0x05, 0x0d, 0x2e, 0x11, 0x47, 0x8d, 0xc7, 0x4f, - 0x77, 0x9d, 0x47, 0xbf, 0xef, 0x92, 0xf0, 0x3a, 0x01, 0xda, 0x07, 0x3a, 0x77, 0xdf, 0x2e, 0xd6, - 0xde, 0x5b, 0x45, 0xd9, 0x9b, 0x2f, 0xc8, 0x96, 0x00, 0xab, 0xfa, 0xa3, 0x51, 0xbd, 0x86, 0xde, - 0xf9, 0xad, 0x62, 0x27, 0x35, 0xef, 0xd1, 0xdb, 0xb0, 0x6a, 0x4a, 0x2c, 0x5a, 0x74, 0x63, 0xa1, - 0x45, 0x58, 0x2a, 0x06, 0xe9, 0x0e, 0x54, 0x95, 0x61, 0x78, 0x95, 0x36, 0xe9, 0xba, 0xa1, 0x35, - 0xe8, 0xeb, 0x50, 0x3b, 0xe6, 0x2c, 0xd6, 0xc7, 0x58, 0xac, 0x1b, 0x16, 0x16, 0xbd, 0x05, 0x6e, - 0xcc, 0x94, 0xbe, 0x23, 0x65, 0x2a, 0x31, 0x61, 0x37, 0x9c, 0x3b, 0xcc, 0x6a, 0x5c, 0x2e, 0xe2, - 0xe2, 0x6a, 0xe0, 0x96, 0x2d, 0xac, 0x86, 0x05, 0xfd, 0x53, 0x7b, 0x6b, 0xaf, 0xa6, 0xbd, 0xf5, - 0xff, 0xd6, 0xde, 0x5f, 0xaa, 0xb0, 0x71, 0xb5, 0x8e, 0x79, 0xeb, 0xc8, 0x62, 0xeb, 0x26, 0x50, - 0x8b, 0xd9, 0x90, 0xc7, 0xe5, 0x9e, 0x6d, 0x07, 0xa3, 0x54, 0x6a, 0xfe, 0x30, 0x1b, 0x06, 0x5f, - 0x19, 0xff, 0x3d, 0x26, 0xe4, 0xd1, 0xc7, 0xe6, 0xae, 0x5f, 0x9f, 0xee, 0xbe, 0xff, 0x32, 0x6f, - 0x98, 0xe5, 0xf5, 0xc6, 0x2c, 0xd3, 0x5c, 0x86, 0x85, 0x3a, 0xcd, 0xa0, 0xc9, 0x92, 0x24, 0xd5, - 0x98, 0x9e, 0xc2, 0x17, 0xe3, 0xff, 0xbf, 0x6c, 0xf1, 0x0a, 0x53, 0xaf, 0xe9, 0x0b, 0xc7, 0xc1, - 0x93, 0xd0, 0x1a, 0xb4, 0x07, 0x6e, 0xf1, 0x75, 0x31, 0x8d, 0x0f, 0xd0, 0xcb, 0xce, 0xae, 0x61, - 0x69, 0x3d, 0x4d, 0x3f, 0x83, 0xc6, 0x44, 0x48, 0x3e, 0x36, 0x0a, 0xcb, 0x4c, 0xbf, 0x8e, 0xac, - 0x9e, 0xa6, 0x77, 0xa0, 0x29, 0xb9, 0x4a, 0xe3, 0x13, 0xab, 0x51, 0x5f, 0x42, 0x03, 0x4a, 0x62, - 0x4f, 0xd3, 0xbb, 0xb0, 0x66, 0x96, 0x79, 0xa0, 0x78, 0xa2, 0x8d, 0x4e, 0x63, 0x19, 0x1d, 0xc3, - 0xec, 0xf3, 0x44, 0xdb, 0x74, 0x4e, 0x58, 0x2c, 0xc6, 0x83, 0x3c, 0xd1, 0x22, 0xf6, 0xdc, 0x65, - 0x64, 0x90, 0xf8, 0x8d, 0xe1, 0xd1, 0x7b, 0xb0, 0xf5, 0x80, 0xf3, 0x6c, 0x30, 0x11, 0x52, 0x24, - 0xd1, 0x40, 0x89, 0x64, 0xc4, 0x3d, 0x58, 0x42, 0xec, 0x86, 0xa1, 0xdf, 0x45, 0x76, 0xdf, 0x90, - 0x0f, 0xbe, 0x87, 0xaa, 0xf9, 0xfc, 0x25, 0x3d, 0xb4, 0x07, 0x45, 0xb7, 0xaf, 0xf9, 0xcf, 0xd5, - 0xda, 0xb9, 0xea, 0x2c, 0x5e, 0x61, 0x87, 0x7e, 0x0e, 0xee, 0xe5, 0xe3, 0x4c, 0xdf, 0x28, 0x40, - 0xcf, 0xbf, 0xee, 0x2d, 0xef, 0xc5, 0x40, 0xa9, 0x70, 0x74, 0x78, 0x76, 0xee, 0x3b, 0x4f, 0xce, - 0x7d, 0xe7, 0xd9, 0xb9, 0x4f, 0x7e, 0x98, 0xf9, 0xe4, 0xe7, 0x99, 0x4f, 0x1e, 0xcf, 0x7c, 0x72, - 0x36, 0xf3, 0xc9, 0x1f, 0x33, 0x9f, 0xfc, 0x39, 0xf3, 0x9d, 0x67, 0x33, 0x9f, 0x3c, 0xba, 0xf0, - 0x9d, 0xb3, 0x0b, 0xdf, 0x79, 0x72, 0xe1, 0x3b, 0xc3, 0x1a, 0x56, 0xf9, 0xc1, 0xdf, 0x01, 0x00, - 0x00, 0xff, 0xff, 0x1e, 0x1c, 0xe1, 0x9c, 0x76, 0x08, 0x00, 0x00, + // 924 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xcd, 0x6e, 0x23, 0x45, + 0x10, 0x9e, 0x71, 0xe2, 0x9f, 0x29, 0x27, 0xd9, 0xa4, 0x13, 0x60, 0xd6, 0x2c, 0x13, 0xcb, 0x08, + 0xc9, 0x42, 0xca, 0x04, 0x42, 0x04, 0x42, 0x42, 0x80, 0xa3, 0xdd, 0x45, 0x48, 0x08, 0xad, 0xc6, + 0x81, 0xeb, 0xa8, 0x6d, 0xb7, 0x27, 0xa3, 0xcc, 0x1f, 0xdd, 0x3d, 0x91, 0x73, 0x82, 0x47, 0xd8, + 0x23, 0x67, 0x4e, 0xbc, 0x01, 0x77, 0x4e, 0x7b, 0xcc, 0x71, 0x85, 0xd0, 0x42, 0x9c, 0x0b, 0xc7, + 0x7d, 0x04, 0xd4, 0xd5, 0x33, 0xb1, 0xbd, 0x1b, 0x50, 0x2c, 0xd8, 0x4b, 0xdc, 0xf5, 0xf3, 0x7d, + 0xd5, 0xf5, 0x55, 0x4d, 0x07, 0x9a, 0x3c, 0x8f, 0x18, 0x77, 0x33, 0x9e, 0xca, 0x94, 0x54, 0xd1, + 0x68, 0xed, 0x05, 0xa1, 0x3c, 0xc9, 0x07, 0xee, 0x30, 0x8d, 0xf7, 0x83, 0x34, 0x48, 0xf7, 0x31, + 0x3a, 0xc8, 0xc7, 0x68, 0xa1, 0x81, 0x27, 0x8d, 0x6a, 0x39, 0x41, 0x9a, 0x06, 0x11, 0x9b, 0x65, + 0x8d, 0x72, 0x4e, 0x65, 0x98, 0x26, 0x45, 0x7c, 0xf7, 0xc5, 0xb8, 0x0c, 0x63, 0x26, 0x24, 0x8d, + 0xb3, 0x22, 0xe1, 0xbd, 0xf9, 0x7a, 0x9c, 0x8e, 0x69, 0x42, 0xf7, 0xe3, 0x30, 0x0e, 0xf9, 0x7e, + 0x76, 0x1a, 0xe8, 0x53, 0x36, 0xd0, 0xbf, 0x05, 0xe2, 0xc3, 0x7f, 0x45, 0x60, 0x17, 0xf8, 0x57, + 0x64, 0x03, 0xfd, 0xab, 0x71, 0x9d, 0x5f, 0x2a, 0xb0, 0xe6, 0x29, 0xdb, 0x63, 0xdf, 0xe5, 0x4c, + 0x48, 0x72, 0x08, 0xb5, 0x71, 0x18, 0x49, 0xc6, 0x6d, 0xb3, 0x6d, 0x76, 0x37, 0x0e, 0xee, 0xb9, + 0x5a, 0x8f, 0xf9, 0x24, 0x34, 0x8e, 0xcf, 0x33, 0xe6, 0x15, 0xb9, 0xe4, 0x4d, 0xb0, 0x54, 0x9a, + 0x9f, 0xd0, 0x98, 0xd9, 0x95, 0xf6, 0x4a, 0xd7, 0xf2, 0x1a, 0xca, 0xf1, 0x35, 0x8d, 0x19, 0x79, + 0x0b, 0x00, 0x83, 0x01, 0x4f, 0xf3, 0xcc, 0x5e, 0xc1, 0x28, 0xa6, 0x7f, 0xa1, 0x1c, 0x84, 0xc0, + 0xea, 0x38, 0x8c, 0x98, 0xbd, 0x8a, 0x01, 0x3c, 0x93, 0x77, 0x60, 0x83, 0x4d, 0x86, 0x51, 0x3e, + 0x62, 0x3e, 0x8d, 0x18, 0x97, 0xc2, 0xae, 0xb6, 0xcd, 0x6e, 0xc3, 0x5b, 0x2f, 0xbc, 0x3d, 0x74, + 0x2a, 0xe6, 0x98, 0x4e, 0x34, 0xb1, 0xb0, 0x6b, 0x6d, 0xb3, 0x5b, 0xf5, 0xac, 0x98, 0x4e, 0x90, + 0x18, 0xc3, 0x09, 0x9b, 0x48, 0x5f, 0xa6, 0xa7, 0x2c, 0xb1, 0xeb, 0x6d, 0x53, 0x15, 0x56, 0x9e, + 0x63, 0xe5, 0xe8, 0x7c, 0x02, 0x8d, 0xb2, 0x11, 0xd2, 0x84, 0x7a, 0x2f, 0x39, 0x57, 0xe6, 0xa6, + 0x41, 0x36, 0x61, 0x0d, 0x0b, 0x84, 0x49, 0x80, 0x1e, 0x93, 0x6c, 0xc1, 0xba, 0xc7, 0x86, 0x29, + 0x1f, 0x95, 0xae, 0x4a, 0xe7, 0x53, 0x58, 0x2f, 0x34, 0x11, 0x59, 0x9a, 0x08, 0x46, 0xf6, 0xa0, + 0x56, 0x5c, 0xc4, 0x6c, 0xaf, 0x74, 0x9b, 0x07, 0xaf, 0x15, 0xca, 0xe1, 0x65, 0xfa, 0x92, 0x4a, + 0x76, 0x9f, 0x89, 0xa1, 0x57, 0x24, 0x75, 0xf6, 0x60, 0xb3, 0x7f, 0x9e, 0x0c, 0x17, 0xc4, 0xbf, + 0x0b, 0x8d, 0x5c, 0x30, 0xee, 0x87, 0x23, 0x4d, 0x62, 0x79, 0x75, 0x65, 0x7f, 0x39, 0x12, 0x9d, + 0x6d, 0xd8, 0x9a, 0x4b, 0xd7, 0x25, 0x3b, 0x3f, 0x55, 0x60, 0x63, 0x91, 0x9e, 0xbc, 0x0b, 0x55, + 0xad, 0xb3, 0x1a, 0x5f, 0xf3, 0x60, 0xc7, 0xd5, 0xd3, 0xf6, 0x4a, 0xb9, 0xf1, 0x0e, 0x3a, 0x85, + 0x7c, 0x04, 0x6b, 0x74, 0x28, 0xc3, 0x33, 0xe6, 0x63, 0x12, 0x0e, 0xae, 0x84, 0xe8, 0x89, 0xcf, + 0xae, 0xdd, 0xd4, 0x99, 0x58, 0x9f, 0x7c, 0x0b, 0xdb, 0xec, 0x8c, 0x46, 0x39, 0x2e, 0xf5, 0x71, + 0xb9, 0xbc, 0xf6, 0x0a, 0x96, 0x6c, 0xb9, 0x7a, 0xbd, 0xdd, 0x72, 0xbd, 0xdd, 0xeb, 0x8c, 0xa3, + 0xc6, 0x93, 0x67, 0xbb, 0xc6, 0xe3, 0x3f, 0x76, 0x4d, 0xef, 0x26, 0x02, 0xd2, 0x07, 0x32, 0x73, + 0xdf, 0x2f, 0x3e, 0x1a, 0x7b, 0x15, 0x69, 0xef, 0xbe, 0x44, 0x5b, 0x26, 0x68, 0xd6, 0x1f, 0x15, + 0xeb, 0x0d, 0xf0, 0xce, 0xef, 0x15, 0x3d, 0xa9, 0x99, 0x46, 0x6f, 0xc3, 0xaa, 0x6a, 0xb1, 0x90, + 0xe8, 0xce, 0x9c, 0x44, 0xd8, 0x2a, 0x06, 0xc9, 0x0e, 0x54, 0x85, 0x42, 0xd8, 0x15, 0xdc, 0x1b, + 0x6d, 0x90, 0xd7, 0xa1, 0x76, 0xc2, 0x68, 0x24, 0x4f, 0xb0, 0x59, 0xcb, 0x2b, 0x2c, 0x72, 0x0f, + 0xac, 0x88, 0x0a, 0xf9, 0x80, 0xf3, 0x94, 0xe3, 0x85, 0x2d, 0x6f, 0xe6, 0x50, 0xab, 0x71, 0xbd, + 0xc6, 0xf3, 0xab, 0x81, 0x5b, 0x36, 0xb7, 0x1a, 0x3a, 0xe9, 0x9f, 0xe4, 0xad, 0xbd, 0x1a, 0x79, + 0xeb, 0xff, 0x4d, 0xde, 0x5f, 0xab, 0xb0, 0xb1, 0xd8, 0xc7, 0x4c, 0x3a, 0x73, 0x5e, 0xba, 0x31, + 0xd4, 0x22, 0x3a, 0x60, 0x51, 0xb9, 0x67, 0xdb, 0xee, 0x30, 0xe5, 0x92, 0x4d, 0xb2, 0x81, 0xfb, + 0x95, 0xf2, 0x3f, 0xa2, 0x21, 0x3f, 0xfa, 0x58, 0xd5, 0xfa, 0xed, 0xd9, 0xee, 0xfb, 0xb7, 0x79, + 0x01, 0x35, 0xae, 0x37, 0xa2, 0x99, 0x64, 0xdc, 0x2b, 0xd8, 0x49, 0x06, 0x4d, 0x9a, 0x24, 0xa9, + 0xc4, 0xeb, 0x09, 0x7c, 0x6f, 0xfe, 0xff, 0x62, 0xf3, 0x25, 0x54, 0xbf, 0x4a, 0x17, 0x86, 0x83, + 0x37, 0x3d, 0x6d, 0x90, 0x1e, 0x58, 0xc5, 0xd7, 0x45, 0x25, 0x3e, 0x5f, 0xb7, 0x9d, 0x5d, 0x43, + 0xc3, 0x7a, 0x92, 0x7c, 0x06, 0x8d, 0x71, 0xc8, 0xd9, 0x48, 0x31, 0x2c, 0x33, 0xfd, 0x3a, 0xa2, + 0x7a, 0x92, 0x3c, 0x80, 0x26, 0x67, 0x22, 0x8d, 0xce, 0x34, 0x47, 0x7d, 0x09, 0x0e, 0x28, 0x81, + 0x3d, 0x49, 0x1e, 0xc2, 0x9a, 0x5a, 0x66, 0x5f, 0xb0, 0x44, 0x2a, 0x9e, 0xc6, 0x32, 0x3c, 0x0a, + 0xd9, 0x67, 0x89, 0xd4, 0xd7, 0x39, 0xa3, 0x51, 0x38, 0xf2, 0xf3, 0x44, 0x86, 0x91, 0x6d, 0x2d, + 0x43, 0x83, 0xc0, 0x6f, 0x14, 0x8e, 0x3c, 0x82, 0xad, 0x53, 0xc6, 0x32, 0x7f, 0x1c, 0xf2, 0x30, + 0x09, 0x7c, 0x11, 0x26, 0x43, 0x66, 0xc3, 0x12, 0x64, 0x77, 0x14, 0xfc, 0x21, 0xa2, 0xfb, 0x0a, + 0x7c, 0xf0, 0x3d, 0x54, 0xd5, 0xe7, 0xcf, 0xc9, 0xa1, 0x3e, 0x08, 0xb2, 0x7d, 0xc3, 0xff, 0xbd, + 0xd6, 0xce, 0xa2, 0xb3, 0x78, 0x85, 0x0d, 0xf2, 0x39, 0x58, 0xd7, 0x8f, 0x33, 0x79, 0xa3, 0x48, + 0x7a, 0xf1, 0x75, 0x6f, 0xd9, 0x2f, 0x07, 0x4a, 0x86, 0xa3, 0xc3, 0x8b, 0x4b, 0xc7, 0x78, 0x7a, + 0xe9, 0x18, 0xcf, 0x2f, 0x1d, 0xf3, 0x87, 0xa9, 0x63, 0xfe, 0x3c, 0x75, 0xcc, 0x27, 0x53, 0xc7, + 0xbc, 0x98, 0x3a, 0xe6, 0x9f, 0x53, 0xc7, 0xfc, 0x6b, 0xea, 0x18, 0xcf, 0xa7, 0x8e, 0xf9, 0xf8, + 0xca, 0x31, 0x2e, 0xae, 0x1c, 0xe3, 0xe9, 0x95, 0x63, 0x0c, 0x6a, 0xd8, 0xe5, 0x07, 0x7f, 0x07, + 0x00, 0x00, 0xff, 0xff, 0x98, 0xb6, 0x46, 0xf9, 0xb4, 0x08, 0x00, 0x00, } func (x RulesRequest_RuleType) String() string { @@ -652,6 +670,12 @@ func (this *RulesRequest) Equal(that interface{}) bool { if this.ExcludeAlerts != that1.ExcludeAlerts { return false } + if this.MaxGroups != that1.MaxGroups { + return false + } + if this.NextToken != that1.NextToken { + return false + } return true } func (this *RulesResponse) Equal(that interface{}) bool { @@ -883,13 +907,15 @@ func (this *RulesRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 11) s = append(s, "&ruler.RulesRequest{") s = append(s, "Filter: "+fmt.Sprintf("%#v", this.Filter)+",\n") s = append(s, "RuleName: "+fmt.Sprintf("%#v", this.RuleName)+",\n") s = append(s, "RuleGroup: "+fmt.Sprintf("%#v", this.RuleGroup)+",\n") s = append(s, "File: "+fmt.Sprintf("%#v", this.File)+",\n") s = append(s, "ExcludeAlerts: "+fmt.Sprintf("%#v", this.ExcludeAlerts)+",\n") + s = append(s, "MaxGroups: "+fmt.Sprintf("%#v", this.MaxGroups)+",\n") + s = append(s, "NextToken: "+fmt.Sprintf("%#v", this.NextToken)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1129,6 +1155,18 @@ func (m *RulesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.NextToken) > 0 { + i -= len(m.NextToken) + copy(dAtA[i:], m.NextToken) + i = encodeVarintRuler(dAtA, i, uint64(len(m.NextToken))) + i-- + dAtA[i] = 0x3a + } + if m.MaxGroups != 0 { + i = encodeVarintRuler(dAtA, i, uint64(m.MaxGroups)) + i-- + dAtA[i] = 0x30 + } if m.ExcludeAlerts { i-- if m.ExcludeAlerts { @@ -1570,6 +1608,13 @@ func (m *RulesRequest) Size() (n int) { if m.ExcludeAlerts { n += 2 } + if m.MaxGroups != 0 { + n += 1 + sovRuler(uint64(m.MaxGroups)) + } + l = len(m.NextToken) + if l > 0 { + n += 1 + l + sovRuler(uint64(l)) + } return n } @@ -1726,6 +1771,8 @@ func (this *RulesRequest) String() string { `RuleGroup:` + fmt.Sprintf("%v", this.RuleGroup) + `,`, `File:` + fmt.Sprintf("%v", this.File) + `,`, `ExcludeAlerts:` + fmt.Sprintf("%v", this.ExcludeAlerts) + `,`, + `MaxGroups:` + fmt.Sprintf("%v", this.MaxGroups) + `,`, + `NextToken:` + fmt.Sprintf("%v", this.NextToken) + `,`, `}`, }, "") return s @@ -1994,6 +2041,57 @@ func (m *RulesRequest) Unmarshal(dAtA []byte) error { } } m.ExcludeAlerts = bool(v != 0) + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxGroups", wireType) + } + m.MaxGroups = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaxGroups |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NextToken", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRuler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRuler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NextToken = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRuler(dAtA[iNdEx:]) diff --git a/pkg/ruler/ruler.proto b/pkg/ruler/ruler.proto index 5c75445b2ff..0777b1f7250 100644 --- a/pkg/ruler/ruler.proto +++ b/pkg/ruler/ruler.proto @@ -38,6 +38,8 @@ message RulesRequest { repeated string rule_group = 3; repeated string file = 4; bool exclude_alerts = 5; + int32 max_groups = 6; + string next_token = 7; } message RulesResponse { diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index a10c4005f7c..ff52af87bf1 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -562,7 +562,7 @@ func TestGetRules(t *testing.T) { ctx := user.InjectOrgID(ctx, u) for _, r := range rulerAddrMap { - rules, err := r.GetRules(ctx, RulesRequest{Filter: AnyRule}) + rules, _, err := r.GetRules(ctx, RulesRequest{Filter: AnyRule}) require.NoError(t, err) require.Equal(t, len(allRulesByUser[u]), len(rules)) @@ -1145,7 +1145,7 @@ func TestRuler_NotifySyncRulesAsync_ShouldTriggerRulesSyncingOnAllRulersWhenEnab // the per-tenant rules manager gets started asynchronously. for _, ruler := range rulers { test.Poll(t, time.Second, numRuleGroups, func() interface{} { - actualRuleGroups, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) + actualRuleGroups, _, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) return len(actualRuleGroups) }) @@ -1168,7 +1168,7 @@ func TestRuler_NotifySyncRulesAsync_ShouldTriggerRulesSyncingOnAllRulersWhenEnab // We use test.Poll() because the rule syncing is asynchronous in each ruler. for _, ruler := range rulers { test.Poll(t, time.Second, numRuleGroups-1, func() interface{} { - actualRuleGroups, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) + actualRuleGroups, _, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) return len(actualRuleGroups) }) @@ -1191,7 +1191,7 @@ func TestRuler_NotifySyncRulesAsync_ShouldTriggerRulesSyncingOnAllRulersWhenEnab // the rule syncing is asynchronous in each ruler. for _, ruler := range rulers { test.Poll(t, time.Second, 0, func() interface{} { - actualRuleGroups, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) + actualRuleGroups, _, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) return len(actualRuleGroups) }) @@ -1292,7 +1292,7 @@ func TestRuler_NotifySyncRulesAsync_ShouldTriggerRulesSyncingAndCorrectlyHandleT // the per-tenant rules manager gets started asynchronously. for _, ruler := range rulers { test.Poll(t, time.Second, numRuleGroups, func() interface{} { - actualRuleGroups, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) + actualRuleGroups, _, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) return len(actualRuleGroups) }) @@ -1331,7 +1331,7 @@ func TestRuler_NotifySyncRulesAsync_ShouldTriggerRulesSyncingAndCorrectlyHandleT // the rule syncing is asynchronous in each ruler. for _, ruler := range rulers { test.Poll(t, time.Second, numRuleGroups, func() interface{} { - actualRuleGroups, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) + actualRuleGroups, _, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) return len(actualRuleGroups) }) @@ -1442,7 +1442,7 @@ func TestRuler_NotifySyncRulesAsync_ShouldNotTriggerRulesSyncingOnAllRulersWhenD // GetRules() should return no configured rule groups, because no re-sync happened. for _, ruler := range rulers { - actualRuleGroups, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) + actualRuleGroups, _, err := ruler.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) require.Empty(t, actualRuleGroups) } @@ -1584,7 +1584,7 @@ func verifyExpectedDeletedRuleGroupsForUser(t *testing.T, r *Ruler, userID strin t.Run("GetRules()", func(t *testing.T) { // The rules manager updates the rules asynchronously so we need to poll it. test.Poll(t, time.Second, expectedDeleted, func() interface{} { - list, err := r.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) + list, _, err := r.GetRules(user.InjectOrgID(ctx, userID), RulesRequest{Filter: AnyRule}) require.NoError(t, err) return len(list) == 0 From c5f3572350d1c1ec323b8d3e7de020a7d8b9deb8 Mon Sep 17 00:00:00 2001 From: fayzal-g Date: Wed, 16 Oct 2024 16:34:21 +0100 Subject: [PATCH 2/3] Address review comments --- pkg/ruler/api.go | 6 ------ pkg/ruler/ruler.go | 27 +++++++++++++++------------ 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index a55924ce3a6..b89796a8357 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -11,7 +11,6 @@ import ( "io" "net/http" "net/url" - "sort" "strconv" "strings" "time" @@ -266,11 +265,6 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { groups = append(groups, &grp) } - // keep data.groups are in order - sort.Slice(groups, func(i, j int) bool { - return groups[i].File < groups[j].File - }) - b, err := json.Marshal(&response{ Status: "success", Data: &RuleDiscovery{RuleGroups: groups, NextToken: token}, diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index bd4d4907058..189d47755f0 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -995,20 +995,21 @@ func (r *Ruler) GetRules(ctx context.Context, req RulesRequest) ([]*GroupStateDe return nil }) - // If the request asks for pagination, we fetch req.MaxGroups number - // of rule groups from each replica. We then merge and sort these and - // take the top k (k = MaxGroups) - if req.MaxGroups > 0 { - slices.SortFunc(merged, func(a, b *GroupStateDesc) int { - nsCmp := strings.Compare(a.Group.Namespace, b.Group.Namespace) - if nsCmp != 0 { - return nsCmp - } + // Sort by namespace and group + slices.SortFunc(merged, func(a, b *GroupStateDesc) int { + nsCmp := strings.Compare(a.Group.Namespace, b.Group.Namespace) + if nsCmp != 0 { + return nsCmp + } - // If Namespaces are equal, check the group names - return strings.Compare(a.Group.Name, b.Group.Name) - }) + // If Namespaces are equal, check the group names + return strings.Compare(a.Group.Name, b.Group.Name) + }) + // If the request asks for pagination, we fetch req.MaxGroups number + // of rule groups from each replica. These are merged and sorted and + // we take the top k (k = MaxGroups) + if req.MaxGroups > 0 { if len(merged) > int(req.MaxGroups) { groupForToken := merged[req.MaxGroups] return merged[:req.MaxGroups], getRuleGroupNextToken(groupForToken.Group.Namespace, groupForToken.Group.Name), err @@ -1106,6 +1107,8 @@ func (r *Ruler) getLocalRules(ctx context.Context, userID string, req RulesReque return nil, errors.Wrap(err, "unable to decode rule filename") } + // If a pagination token is provided, skip past groups until we reach the namespace+group that is + // greater than or equal to the namespace+group used to generate the token. if req.NextToken != "" && !foundToken { if !tokenGreaterThanOrEqual(getRuleGroupNextToken(decodedNamespace, group.Name()), req.NextToken) { continue From 3b5887a23814dde6f7a8834e53b015458129214b Mon Sep 17 00:00:00 2001 From: fayzal-g Date: Thu, 17 Oct 2024 12:51:40 +0100 Subject: [PATCH 3/3] Add integration test with 2 ruler replicas --- integration/e2emimir/client.go | 33 ++++++-- integration/ruler_test.go | 139 +++++++++++++++++++++++++++++++- pkg/ruler/api_test.go | 140 --------------------------------- 3 files changed, 161 insertions(+), 151 deletions(-) diff --git a/integration/e2emimir/client.go b/integration/e2emimir/client.go index 8a86b903395..a435fdd7b9c 100644 --- a/integration/e2emimir/client.go +++ b/integration/e2emimir/client.go @@ -674,11 +674,27 @@ type successResult struct { } // GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules. -func (c *Client) GetPrometheusRules() ([]*promv1.RuleGroup, error) { +func (c *Client) GetPrometheusRules(maxGroups int, token string) ([]*promv1.RuleGroup, string, error) { + url, err := url.Parse(fmt.Sprintf("http://%s/prometheus/api/v1/rules", c.rulerAddress)) + if err != nil { + return nil, "", err + } + if token != "" { + q := url.Query() + q.Add("next_token", token) + url.RawQuery = q.Encode() + } + + if maxGroups != 0 { + q := url.Query() + q.Add("max_groups", strconv.Itoa(maxGroups)) + url.RawQuery = q.Encode() + } + // Create HTTP request - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/prometheus/api/v1/rules", c.rulerAddress), nil) + req, err := http.NewRequest("GET", url.String(), nil) if err != nil { - return nil, err + return nil, "", err } req.Header.Set("X-Scope-OrgID", c.orgID) @@ -688,13 +704,13 @@ func (c *Client) GetPrometheusRules() ([]*promv1.RuleGroup, error) { // Execute HTTP request res, err := c.httpClient.Do(req.WithContext(ctx)) if err != nil { - return nil, err + return nil, "", err } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - return nil, err + return nil, "", err } // Decode the response. @@ -702,19 +718,20 @@ func (c *Client) GetPrometheusRules() ([]*promv1.RuleGroup, error) { Status string `json:"status"` Data struct { RuleGroups []*promv1.RuleGroup `json:"groups"` + NextToken string `json:"nextToken,omitempty"` } `json:"data"` } decoded := response{} if err := json.Unmarshal(body, &decoded); err != nil { - return nil, err + return nil, "", err } if decoded.Status != "success" { - return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status) + return nil, "", fmt.Errorf("unexpected response status '%s'", decoded.Status) } - return decoded.Data.RuleGroups, nil + return decoded.Data.RuleGroups, decoded.Data.NextToken, nil } // GetRuleGroups gets the configured rule groups from the ruler. diff --git a/integration/ruler_test.go b/integration/ruler_test.go index 7e399bcb69d..cb1aefe30dc 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -15,6 +15,7 @@ import ( "net/http" "os" "path/filepath" + "slices" "strconv" "strings" "testing" @@ -204,6 +205,138 @@ func TestRulerAPISingleBinary(t *testing.T) { require.NoError(t, mimirRestarted.WaitSumMetrics(e2e.Equals(1), "cortex_ruler_managers_total")) } +func TestRulerAPIRulesPagination(t *testing.T) { + const ( + numNamespaces = 3 + numRuleGroups = 9 + ) + + type NGPair struct { + Namespace string + Group string + } + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, mimirBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Configure the ruler. + rulerFlags := mergeFlags( + CommonStorageBackendFlags(), + RulerFlags(), + BlocksStorageFlags(), + RulerShardingFlags(consul.NetworkHTTPEndpoint()), + map[string]string{ + // Disable rule group limit + "-ruler.max-rule-groups-per-tenant": "0", + }, + ) + + // Start rulers. + ruler1 := e2emimir.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags) + ruler2 := e2emimir.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags) + rulers := e2emimir.NewCompositeMimirService(ruler1, ruler2) + require.NoError(t, s.StartAndWaitReady(ruler1, ruler2)) + + // Generate and upload rule groups to one of the rulers. + c, err := e2emimir.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1") + require.NoError(t, err) + + // Generate multiple rule groups, with 1 rule each. Write them in + // reverse order and check that they are sorted when returned. + expectedGroups := make([]NGPair, 0, numRuleGroups) + for i := numRuleGroups - 1; i >= 0; i-- { + var recordNode yaml.Node + var exprNode yaml.Node + + recordNode.SetString(fmt.Sprintf("rule_%d", i)) + exprNode.SetString(strconv.Itoa(i)) + ruleGroupName := fmt.Sprintf("test_%d", i) + + expectedGroups = append(expectedGroups, + NGPair{ + Namespace: fmt.Sprintf("namespace_%d", i/numNamespaces), + Group: ruleGroupName, + }, + ) + + require.NoError(t, c.SetRuleGroup(rulefmt.RuleGroup{ + Name: ruleGroupName, + Interval: 60, + Rules: []rulefmt.RuleNode{{ + Record: recordNode, + Expr: exprNode, + }}, + }, fmt.Sprintf("namespace_%d", i/numNamespaces))) + } + + // Sort expectedGroups as it is currently in reverse order + slices.SortFunc(expectedGroups, func(a, b NGPair) int { + fileCompare := strings.Compare(a.Namespace, b.Namespace) + + // If its 0, then the file names are the same, + // so compare the groups + if fileCompare != 0 { + return fileCompare + } + return strings.Compare(a.Group, b.Group) + }) + + // Wait until rulers have loaded all rules. + require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRuleGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics)) + + // Since rulers have loaded all rules, we expect that rules have been sharded + // between the two rulers. + require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(float64(numRuleGroups)), "cortex_prometheus_rule_group_rules")) + require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(float64(numRuleGroups)), "cortex_prometheus_rule_group_rules")) + + // No page size limit + actualGroups, token, err := c.GetPrometheusRules(0, "") + require.NoError(t, err) + require.Empty(t, token) + require.Len(t, actualGroups, len(expectedGroups)) + for i := 0; i < len(expectedGroups); i++ { + require.Equal(t, expectedGroups[i].Namespace, actualGroups[i].File) + require.Equal(t, expectedGroups[i].Group, actualGroups[i].Name) + } + + // We have 9 groups, keep fetching rules with a group page size of 2. The final + // page should have size 1 and an empty nextToken. Also check the groups are returned + // in order + var nextToken string + returnedGroups := make([]NGPair, 0, len(expectedGroups)) + for i := 0; i < 4; i++ { + gps, token, err := c.GetPrometheusRules(2, nextToken) + require.NoError(t, err) + require.Len(t, gps, 2) + require.NotEmpty(t, token) + + returnedGroups = append(returnedGroups, NGPair{gps[0].File, gps[0].Name}, NGPair{gps[1].File, gps[1].Name}) + nextToken = token + } + gps, token, err := c.GetPrometheusRules(2, nextToken) + require.NoError(t, err) + require.Len(t, gps, 1) + require.Empty(t, token) + returnedGroups = append(returnedGroups, NGPair{gps[0].File, gps[0].Name}) + + // Check the returned rules match the rules written + require.Len(t, returnedGroups, len(expectedGroups)) + for i := 0; i < len(expectedGroups); i++ { + require.Equal(t, expectedGroups[i].Namespace, returnedGroups[i].Namespace) + require.Equal(t, expectedGroups[i].Group, returnedGroups[i].Group) + } + + // Invalid max groups value + _, _, err = c.GetPrometheusRules(-1, "") + require.Error(t, err) +} + func TestRulerEvaluationDelay(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) @@ -401,7 +534,7 @@ func TestRulerSharding(t *testing.T) { require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules")) // Fetch the rules and ensure they match the configured ones. - actualGroups, err := c.GetPrometheusRules() + actualGroups, _, err := c.GetPrometheusRules(0, "") require.NoError(t, err) var actualNames []string @@ -1318,7 +1451,7 @@ func TestRuler_RestoreWithLongForPeriod(t *testing.T) { assert.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Greater(evalsForAlertToFire), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WaitMissingMetrics)) // Assert that the alert is firing - rules, err := c.GetPrometheusRules() + rules, _, err := c.GetPrometheusRules(0, "") assert.NoError(t, err) assert.Equal(t, "firing", rules[0].Rules[0].(v1.AlertingRule).State) @@ -1335,7 +1468,7 @@ func TestRuler_RestoreWithLongForPeriod(t *testing.T) { assert.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(evalsToRestoredAlertState), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WaitMissingMetrics)) // Assert the alert is already firing - rules, err = c.GetPrometheusRules() + rules, _, err = c.GetPrometheusRules(0, "") assert.NoError(t, err) assert.Equal(t, "firing", rules[0].Rules[0].(v1.AlertingRule).State) } diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 7e7ddb36b5b..cfce95dce4a 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -11,12 +11,9 @@ import ( "errors" "fmt" "io" - "math/rand" "net/http" "net/http/httptest" "net/url" - "slices" - "strconv" "strings" "testing" "time" @@ -979,143 +976,6 @@ func TestRuler_PrometheusRules(t *testing.T) { } } -func TestRuler_PrometheusRulesPagination(t *testing.T) { - const ( - userID = "user1" - interval = time.Minute - ) - - ruleGroups := rulespb.RuleGroupList{} - for ns := 0; ns < 3; ns++ { - for group := 0; group < 3; group++ { - g := &rulespb.RuleGroupDesc{ - Name: fmt.Sprintf("test-group-%d", group), - Namespace: fmt.Sprintf("test-namespace-%d", ns), - User: userID, - Rules: []*rulespb.RuleDesc{ - createAlertingRule("testalertingrule", "up < 1"), - }, - Interval: interval, - } - ruleGroups = append(ruleGroups, g) - } - } - - // Shuffle the rules before writing to check the sorting when they're returned - rand.Shuffle(len(ruleGroups), func(i, j int) { ruleGroups[i], ruleGroups[j] = ruleGroups[j], ruleGroups[i] }) - - cfg := defaultRulerConfig(t) - cfg.TenantFederation.Enabled = true - - storageRules := map[string]rulespb.RuleGroupList{ - userID: ruleGroups, - } - - r := prepareRuler(t, cfg, newMockRuleStore(storageRules), withRulerAddrAutomaticMapping(), withLimits(validation.MockDefaultOverrides()), withStart()) - - // Sort them so we can compare against them - slices.SortFunc(ruleGroups, func(a, b *rulespb.RuleGroupDesc) int { - fileCompare := strings.Compare(a.Namespace, b.Namespace) - - // If its 0, then the file names are the same. - // Lets look at the group names in that case. - if fileCompare != 0 { - return fileCompare - } - return strings.Compare(a.Name, b.Name) - }) - - // Rules will be synchronized asynchronously, so we wait until the expected number of rule groups - // has been synched. - test.Poll(t, 5*time.Second, len(ruleGroups), func() interface{} { - ctx := user.InjectOrgID(context.Background(), userID) - rls, _ := r.Rules(ctx, &RulesRequest{}) - return len(rls.Groups) - }) - - a := NewAPI(r, r.store, log.NewNopLogger()) - - getRulesResponse := func(groupSize int, nextToken string) response { - queryParams := "?" + url.Values{ - "max_groups": []string{strconv.Itoa(groupSize)}, - "next_token": []string{nextToken}, - }.Encode() - req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+queryParams, nil, userID) - w := httptest.NewRecorder() - a.PrometheusRules(w, req) - - resp := w.Result() - body, _ := io.ReadAll(resp.Body) - - r := response{} - err := json.Unmarshal(body, &r) - require.NoError(t, err) - - return r - } - - getRulesFromResponse := func(resp response) RuleDiscovery { - jsonRules, err := json.Marshal(resp.Data) - require.NoError(t, err) - returnedRules := RuleDiscovery{} - require.NoError(t, json.Unmarshal(jsonRules, &returnedRules)) - - return returnedRules - } - - // No page size limit - resp := getRulesResponse(0, "") - require.Equal(t, "success", resp.Status) - rd := getRulesFromResponse(resp) - require.Len(t, rd.RuleGroups, len(ruleGroups)) - require.Empty(t, rd.NextToken) - - // We have 9 groups, keep fetching rules with a group page size of 2. The final - // page should have size 1 and an empty nextToken. Also check the groups are returned - // in order - var nextToken string - returnedRuleGroups := make([]*RuleGroup, 0, len(ruleGroups)) - for i := 0; i < 4; i++ { - resp := getRulesResponse(2, nextToken) - require.Equal(t, "success", resp.Status) - - rd := getRulesFromResponse(resp) - require.Len(t, rd.RuleGroups, 2) - require.NotEmpty(t, rd.NextToken) - - returnedRuleGroups = append(returnedRuleGroups, rd.RuleGroups[0], rd.RuleGroups[1]) - nextToken = rd.NextToken - } - resp = getRulesResponse(2, nextToken) - require.Equal(t, "success", resp.Status) - - rd = getRulesFromResponse(resp) - require.Len(t, rd.RuleGroups, 1) - require.Empty(t, rd.NextToken) - returnedRuleGroups = append(returnedRuleGroups, rd.RuleGroups[0]) - - // Check the returned rules match the rules written - require.Equal(t, len(ruleGroups), len(returnedRuleGroups)) - for i := 0; i < len(ruleGroups); i++ { - require.Equal(t, ruleGroups[i].Namespace, returnedRuleGroups[i].File) - require.Equal(t, ruleGroups[i].Name, returnedRuleGroups[i].Name) - require.Equal(t, len(ruleGroups[i].Rules), len(returnedRuleGroups[i].Rules)) - for j := 0; j < len(ruleGroups[i].Rules); j++ { - jsonRule, err := json.Marshal(returnedRuleGroups[i].Rules[j]) - require.NoError(t, err) - rule := alertingRule{} - require.NoError(t, json.Unmarshal(jsonRule, &rule)) - require.Equal(t, ruleGroups[i].Rules[j].Alert, rule.Name) - } - } - - // Invalid max groups value - resp = getRulesResponse(-1, "") - require.Equal(t, "error", resp.Status) - require.Equal(t, v1.ErrBadData, resp.ErrorType) - require.Equal(t, "invalid max groups value", resp.Error) -} - func TestRuler_PrometheusAlerts(t *testing.T) { cfg := defaultRulerConfig(t)