Skip to content

Commit dd5a776

Browse files
authored
Fixed nil pointer error from PushStream in ingester when Push return nil response with an error (#6795)
Signed-off-by: Alex Le <[email protected]>
1 parent 3a4658e commit dd5a776

File tree

2 files changed

+69
-0
lines changed

2 files changed

+69
-0
lines changed

integration/ingester_stream_push_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,69 @@ func TestIngesterStreamPushConnection(t *testing.T) {
116116
assertServiceMetricsPrefixes(t, Ingester, ingester2)
117117
assertServiceMetricsPrefixes(t, Ingester, ingester3)
118118
}
119+
120+
func TestIngesterStreamPushConnectionWithError(t *testing.T) {
121+
122+
s, err := e2e.NewScenario(networkName)
123+
require.NoError(t, err)
124+
defer s.Close()
125+
126+
maxGlobalSeriesPerMetric := 300
127+
maxGlobalSeriesPerTenant := 1000
128+
129+
flags := BlocksStorageFlags()
130+
flags["-distributor.use-stream-push"] = "true"
131+
flags["-distributor.replication-factor"] = "1"
132+
flags["-distributor.shard-by-all-labels"] = "true"
133+
flags["-distributor.sharding-strategy"] = "shuffle-sharding"
134+
flags["-distributor.ingestion-tenant-shard-size"] = "1"
135+
flags["-ingester.max-series-per-user"] = "0"
136+
flags["-ingester.max-series-per-metric"] = "0"
137+
flags["-ingester.max-global-series-per-user"] = strconv.Itoa(maxGlobalSeriesPerTenant)
138+
flags["-ingester.max-global-series-per-metric"] = strconv.Itoa(maxGlobalSeriesPerMetric)
139+
flags["-ingester.heartbeat-period"] = "1s"
140+
flags["-ingester.instance-limits.max-series"] = "1" // trigger "max series limit reached" error which returns nil response
141+
142+
// Start dependencies.
143+
consul := e2edb.NewConsul()
144+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
145+
require.NoError(t, s.StartAndWaitReady(consul, minio))
146+
147+
// Start Cortex components.
148+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
149+
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
150+
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
151+
ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
152+
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))
153+
154+
// Wait until distributor has updated the ring.
155+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
156+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
157+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
158+
159+
// Wait until ingesters have heartbeated the ring after all ingesters were active,
160+
// in order to update the number of instances. Since we have no metric, we have to
161+
// rely on a ugly sleep.
162+
time.Sleep(2 * time.Second)
163+
164+
now := time.Now()
165+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
166+
require.NoError(t, err)
167+
168+
for i := 0; i < 5; i++ {
169+
series, _ := generateSeries("test_limit_per_metric", now,
170+
prompb.Label{
171+
Name: "cardinality",
172+
Value: strconv.Itoa(rand.Int()),
173+
},
174+
)
175+
_, err = client.Push(series)
176+
require.NoError(t, err)
177+
err = ingester1.WaitForRunning()
178+
require.NoError(t, err)
179+
err = ingester2.WaitForRunning()
180+
require.NoError(t, err)
181+
err = ingester3.WaitForRunning()
182+
require.NoError(t, err)
183+
}
184+
}

pkg/ingester/ingester.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,6 +1577,9 @@ func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error {
15771577
}
15781578
ctx = user.InjectOrgID(ctx, req.TenantID)
15791579
resp, err := i.Push(ctx, req.Request)
1580+
if resp == nil {
1581+
resp = &cortexpb.WriteResponse{}
1582+
}
15801583
resp.Code = http.StatusOK
15811584
if err != nil {
15821585
httpResponse, isGRPCError := httpgrpc.HTTPResponseFromError(err)

0 commit comments

Comments
 (0)