Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return server-side total bytes processed statistics as a header through query frontend #9645

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* `-memberlist.max-concurrent-writes`
* `-memberlist.acquire-writer-timeout`
* [ENHANCEMENT] memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storm CPU activity in large clusters. #9594
* [ENHANCEMENT] Return server-side total bytes processed statistics as a header through query frontend. #9645
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
5 changes: 5 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Head
parts := make([]string, 0)
parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime()))
parts = append(parts, statsValue("response_time", queryResponseTime))
parts = append(parts, statsBytesProcessedValue("bytes_processed", stats.LoadFetchedChunkBytes()+stats.LoadFetchedIndexBytes()))
headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", "))
}
}
Expand All @@ -493,6 +494,10 @@ func statsValue(name string, d time.Duration) string {
return name + ";dur=" + durationInMs
}

func statsBytesProcessedValue(name string, value uint64) string {
return name + "=" + strconv.FormatUint(value, 10)
}

func httpRequestActivity(request *http.Request, userAgent string, requestParams url.Values) string {
tenantID := "(unknown)"
if tenantIDs, err := tenant.TenantIDs(request.Context()); err == nil {
Expand Down
25 changes: 25 additions & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
expectedMetrics int
expectedActivity string
expectedReadConsistency string
assertHeaders func(t *testing.T, headers http.Header)
}{
{
name: "handler with stats enabled, POST request with params",
Expand Down Expand Up @@ -284,6 +285,27 @@ func TestHandler_ServeHTTP(t *testing.T) {
expectedActivity: "user:12345 UA: req:GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
{
name: "handler with stats enabled, check ServiceTimingHeader",
cfg: HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 1024},
request: func() *http.Request {
req := httptest.NewRequest(http.MethodPost, "/api/v1/query", strings.NewReader("query=some_metric&time=42"))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
return req
},
downstreamResponse: makeSuccessfulDownstreamResponse(),
expectedStatusCode: 200,
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "user:12345 UA: req:POST /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
assertHeaders: func(t *testing.T, headers http.Header) {
assert.Contains(t, headers.Get(ServiceTimingHeaderName), "bytes_processed=0")
},
},
} {
t.Run(tt.name, func(t *testing.T) {
activityFile := filepath.Join(t.TempDir(), "activity-tracker")
Expand Down Expand Up @@ -370,6 +392,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
if tt.expectedStatusCode >= 200 && tt.expectedStatusCode < 300 {
require.Equal(t, int64(len(responseData)), msg["response_size_bytes"])
}
if tt.assertHeaders != nil {
tt.assertHeaders(t, resp.Header())
}

// Check that the HTTP or Protobuf request parameters are logged.
paramsLogged := 0
Expand Down
Loading