Skip to content

Commit

Permalink
Add integration test with 2 ruler replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
fayzal-g committed Oct 17, 2024
1 parent c5f3572 commit 3b5887a
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 151 deletions.
33 changes: 25 additions & 8 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,11 +674,27 @@ type successResult struct {
}

// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
func (c *Client) GetPrometheusRules() ([]*promv1.RuleGroup, error) {
func (c *Client) GetPrometheusRules(maxGroups int, token string) ([]*promv1.RuleGroup, string, error) {
url, err := url.Parse(fmt.Sprintf("http://%s/prometheus/api/v1/rules", c.rulerAddress))
if err != nil {
return nil, "", err
}
if token != "" {
q := url.Query()
q.Add("next_token", token)
url.RawQuery = q.Encode()
}

if maxGroups != 0 {
q := url.Query()
q.Add("max_groups", strconv.Itoa(maxGroups))
url.RawQuery = q.Encode()
}

// Create HTTP request
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/prometheus/api/v1/rules", c.rulerAddress), nil)
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
return nil, "", err
}
req.Header.Set("X-Scope-OrgID", c.orgID)

Expand All @@ -688,33 +704,34 @@ func (c *Client) GetPrometheusRules() ([]*promv1.RuleGroup, error) {
// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
return nil, "", err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
return nil, "", err
}

// Decode the response.
type response struct {
Status string `json:"status"`
Data struct {
RuleGroups []*promv1.RuleGroup `json:"groups"`
NextToken string `json:"nextToken,omitempty"`
} `json:"data"`
}

decoded := response{}
if err := json.Unmarshal(body, &decoded); err != nil {
return nil, err
return nil, "", err
}

if decoded.Status != "success" {
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
return nil, "", fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data.RuleGroups, nil
return decoded.Data.RuleGroups, decoded.Data.NextToken, nil
}

// GetRuleGroups gets the configured rule groups from the ruler.
Expand Down
139 changes: 136 additions & 3 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/http"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -204,6 +205,138 @@ func TestRulerAPISingleBinary(t *testing.T) {
require.NoError(t, mimirRestarted.WaitSumMetrics(e2e.Equals(1), "cortex_ruler_managers_total"))
}

func TestRulerAPIRulesPagination(t *testing.T) {
const (
numNamespaces = 3
numRuleGroups = 9
)

type NGPair struct {
Namespace string
Group string
}

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, mimirBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
rulerFlags := mergeFlags(
CommonStorageBackendFlags(),
RulerFlags(),
BlocksStorageFlags(),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
// Disable rule group limit
"-ruler.max-rule-groups-per-tenant": "0",
},
)

// Start rulers.
ruler1 := e2emimir.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags)
ruler2 := e2emimir.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags)
rulers := e2emimir.NewCompositeMimirService(ruler1, ruler2)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))

// Generate and upload rule groups to one of the rulers.
c, err := e2emimir.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
require.NoError(t, err)

// Generate multiple rule groups, with 1 rule each. Write them in
// reverse order and check that they are sorted when returned.
expectedGroups := make([]NGPair, 0, numRuleGroups)
for i := numRuleGroups - 1; i >= 0; i-- {
var recordNode yaml.Node
var exprNode yaml.Node

recordNode.SetString(fmt.Sprintf("rule_%d", i))
exprNode.SetString(strconv.Itoa(i))
ruleGroupName := fmt.Sprintf("test_%d", i)

expectedGroups = append(expectedGroups,
NGPair{
Namespace: fmt.Sprintf("namespace_%d", i/numNamespaces),
Group: ruleGroupName,
},
)

require.NoError(t, c.SetRuleGroup(rulefmt.RuleGroup{
Name: ruleGroupName,
Interval: 60,
Rules: []rulefmt.RuleNode{{
Record: recordNode,
Expr: exprNode,
}},
}, fmt.Sprintf("namespace_%d", i/numNamespaces)))
}

// Sort expectedGroups as it is currently in reverse order
slices.SortFunc(expectedGroups, func(a, b NGPair) int {
fileCompare := strings.Compare(a.Namespace, b.Namespace)

// If its 0, then the file names are the same,
// so compare the groups
if fileCompare != 0 {
return fileCompare
}
return strings.Compare(a.Group, b.Group)
})

// Wait until rulers have loaded all rules.
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRuleGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

// Since rulers have loaded all rules, we expect that rules have been sharded
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(float64(numRuleGroups)), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(float64(numRuleGroups)), "cortex_prometheus_rule_group_rules"))

// No page size limit
actualGroups, token, err := c.GetPrometheusRules(0, "")
require.NoError(t, err)
require.Empty(t, token)
require.Len(t, actualGroups, len(expectedGroups))
for i := 0; i < len(expectedGroups); i++ {
require.Equal(t, expectedGroups[i].Namespace, actualGroups[i].File)
require.Equal(t, expectedGroups[i].Group, actualGroups[i].Name)
}

// We have 9 groups, keep fetching rules with a group page size of 2. The final
// page should have size 1 and an empty nextToken. Also check the groups are returned
// in order
var nextToken string
returnedGroups := make([]NGPair, 0, len(expectedGroups))
for i := 0; i < 4; i++ {
gps, token, err := c.GetPrometheusRules(2, nextToken)
require.NoError(t, err)
require.Len(t, gps, 2)
require.NotEmpty(t, token)

returnedGroups = append(returnedGroups, NGPair{gps[0].File, gps[0].Name}, NGPair{gps[1].File, gps[1].Name})
nextToken = token
}
gps, token, err := c.GetPrometheusRules(2, nextToken)
require.NoError(t, err)
require.Len(t, gps, 1)
require.Empty(t, token)
returnedGroups = append(returnedGroups, NGPair{gps[0].File, gps[0].Name})

// Check the returned rules match the rules written
require.Len(t, returnedGroups, len(expectedGroups))
for i := 0; i < len(expectedGroups); i++ {
require.Equal(t, expectedGroups[i].Namespace, returnedGroups[i].Namespace)
require.Equal(t, expectedGroups[i].Group, returnedGroups[i].Group)
}

// Invalid max groups value
_, _, err = c.GetPrometheusRules(-1, "")
require.Error(t, err)
}

func TestRulerEvaluationDelay(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -401,7 +534,7 @@ func TestRulerSharding(t *testing.T) {
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))

// Fetch the rules and ensure they match the configured ones.
actualGroups, err := c.GetPrometheusRules()
actualGroups, _, err := c.GetPrometheusRules(0, "")
require.NoError(t, err)

var actualNames []string
Expand Down Expand Up @@ -1318,7 +1451,7 @@ func TestRuler_RestoreWithLongForPeriod(t *testing.T) {
assert.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Greater(evalsForAlertToFire), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WaitMissingMetrics))

// Assert that the alert is firing
rules, err := c.GetPrometheusRules()
rules, _, err := c.GetPrometheusRules(0, "")
assert.NoError(t, err)
assert.Equal(t, "firing", rules[0].Rules[0].(v1.AlertingRule).State)

Expand All @@ -1335,7 +1468,7 @@ func TestRuler_RestoreWithLongForPeriod(t *testing.T) {
assert.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(evalsToRestoredAlertState), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WaitMissingMetrics))

// Assert the alert is already firing
rules, err = c.GetPrometheusRules()
rules, _, err = c.GetPrometheusRules(0, "")
assert.NoError(t, err)
assert.Equal(t, "firing", rules[0].Rules[0].(v1.AlertingRule).State)
}
Expand Down
Loading

0 comments on commit 3b5887a

Please sign in to comment.