From bcc2146943327d8d6e2852d9f9e8d5795bdf954e Mon Sep 17 00:00:00 2001 From: madhu-reddy-peram Date: Wed, 16 Oct 2024 14:56:26 +0100 Subject: [PATCH 1/5] Return server-side total bytes processed statistics as a header through the query frontend --- pkg/frontend/transport/handler.go | 17 +++++++++++++---- pkg/frontend/transport/handler_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index a1914709832..e68ccb82696 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -39,10 +39,11 @@ import ( const ( // StatusClientClosedRequest is the status code for when a client request cancellation of an http request - StatusClientClosedRequest = 499 - ServiceTimingHeaderName = "Server-Timing" - cacheControlHeader = "Cache-Control" - cacheControlLogField = "header_cache_control" + StatusClientClosedRequest = 499 + ServiceTimingHeaderName = "Server-Timing" + cacheControlHeader = "Cache-Control" + cacheControlLogField = "header_cache_control" + ServiceTotalBytesProcessed = "X-Total-Bytes-Processed" ) var ( @@ -246,6 +247,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if f.cfg.QueryStatsEnabled { writeServiceTimingHeader(queryResponseTime, hs, queryDetails.QuerierStats) + writeTotalBytesProcessedHeader(hs, queryDetails.QuerierStats) } w.WriteHeader(resp.StatusCode) @@ -488,6 +490,13 @@ func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Head } } +func writeTotalBytesProcessedHeader(headers http.Header, stats *querier_stats.Stats) { + if stats != nil { + totalBytes := stats.LoadFetchedChunkBytes() + stats.LoadFetchedIndexBytes() + headers.Set(ServiceTotalBytesProcessed, strconv.FormatUint(totalBytes, 10)) + } +} + func statsValue(name string, d time.Duration) string { durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64) return name + ";dur=" + durationInMs diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index aaf5a6503f2..0b3ddaaae1d 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -94,6 +94,7 @@ func TestHandler_ServeHTTP(t *testing.T) { expectedMetrics int expectedActivity string expectedReadConsistency string + assertHeaders func(t *testing.T, headers http.Header) }{ { name: "handler with stats enabled, POST request with params", @@ -284,6 +285,27 @@ func TestHandler_ServeHTTP(t *testing.T) { expectedActivity: "user:12345 UA: req:GET /api/v1/query query=some_metric&time=42", expectedReadConsistency: "", }, + { + name: "handler with stats enabled, check ServiceTotalBytesProcessed header", + cfg: HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 1024}, + request: func() *http.Request { + req := httptest.NewRequest(http.MethodPost, "/api/v1/query", strings.NewReader("query=some_metric&time=42")) + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + return req + }, + downstreamResponse: makeSuccessfulDownstreamResponse(), + expectedStatusCode: 200, + expectedParams: url.Values{ + "query": []string{"some_metric"}, + "time": []string{"42"}, + }, + expectedMetrics: 5, + expectedActivity: "user:12345 UA: req:POST /api/v1/query query=some_metric&time=42", + expectedReadConsistency: "", + assertHeaders: func(t *testing.T, headers http.Header) { + assert.Equal(t, "0", headers.Get(ServiceTotalBytesProcessed)) + }, + }, } { t.Run(tt.name, func(t *testing.T) { activityFile := filepath.Join(t.TempDir(), "activity-tracker") @@ -370,6 +392,9 @@ func TestHandler_ServeHTTP(t *testing.T) { if tt.expectedStatusCode >= 200 && tt.expectedStatusCode < 300 { require.Equal(t, int64(len(responseData)), msg["response_size_bytes"]) } + if tt.assertHeaders != nil { + tt.assertHeaders(t, resp.Header()) + } // Check that the HTTP or Protobuf request parameters are logged. paramsLogged := 0 From d6f988e34f4dc1b54fb9d6149883c376a69ad161 Mon Sep 17 00:00:00 2001 From: madhu-reddy-peram Date: Wed, 16 Oct 2024 15:12:59 +0100 Subject: [PATCH 2/5] update chanlog.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b1775ea6b6..ea043783468 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ * `-memberlist.max-concurrent-writes` * `-memberlist.acquire-writer-timeout` * [ENHANCEMENT] memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storm CPU activity in large clusters. #9594 +* [ENHANCEMENT] Return server-side total bytes processed statistics as a header through query frontend. #9645 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508 From cc5c6472d048f57ac2c05f12b9a8d3b73d1fd52c Mon Sep 17 00:00:00 2001 From: madhu-reddy-peram Date: Wed, 16 Oct 2024 16:03:22 +0100 Subject: [PATCH 3/5] lint error fix in changelog.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea043783468..40c1906b2b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,7 +42,7 @@ * `-memberlist.max-concurrent-writes` * `-memberlist.acquire-writer-timeout` * [ENHANCEMENT] memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storm CPU activity in large clusters. #9594 -* [ENHANCEMENT] Return server-side total bytes processed statistics as a header through query frontend. #9645 +* [ENHANCEMENT] Return server-side total bytes processed statistics as a header through query frontend. #9645 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508 From 92f6c5ce7cf9e994962be547ccacfbf141956d0e Mon Sep 17 00:00:00 2001 From: madhu-reddy-peram Date: Wed, 16 Oct 2024 17:43:21 +0100 Subject: [PATCH 4/5] Add bytes processed stats to existing Server-Timing header --- pkg/frontend/transport/handler.go | 22 +++++++++------------- pkg/frontend/transport/handler_test.go | 4 ++-- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index e68ccb82696..57e7e911457 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -39,11 +39,10 @@ import ( const ( // StatusClientClosedRequest is the status code for when a client request cancellation of an http request - StatusClientClosedRequest = 499 - ServiceTimingHeaderName = "Server-Timing" - cacheControlHeader = "Cache-Control" - cacheControlLogField = "header_cache_control" - ServiceTotalBytesProcessed = "X-Total-Bytes-Processed" + StatusClientClosedRequest = 499 + ServiceTimingHeaderName = "Server-Timing" + cacheControlHeader = "Cache-Control" + cacheControlLogField = "header_cache_control" ) var ( @@ -247,7 +246,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if f.cfg.QueryStatsEnabled { writeServiceTimingHeader(queryResponseTime, hs, queryDetails.QuerierStats) - writeTotalBytesProcessedHeader(hs, queryDetails.QuerierStats) } w.WriteHeader(resp.StatusCode) @@ -486,22 +484,20 @@ func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Head parts := make([]string, 0) parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime())) parts = append(parts, statsValue("response_time", queryResponseTime)) + parts = append(parts, statsBytesProcessedValue("bytes_processed", stats.LoadFetchedChunkBytes()+stats.LoadFetchedIndexBytes())) headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", ")) } } -func writeTotalBytesProcessedHeader(headers http.Header, stats *querier_stats.Stats) { - if stats != nil { - totalBytes := stats.LoadFetchedChunkBytes() + stats.LoadFetchedIndexBytes() - headers.Set(ServiceTotalBytesProcessed, strconv.FormatUint(totalBytes, 10)) - } -} - func statsValue(name string, d time.Duration) string { durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64) return name + ";dur=" + durationInMs } +func statsBytesProcessedValue(name string, value uint64) string { + return name + "=" + strconv.FormatUint(value, 10) +} + func httpRequestActivity(request *http.Request, userAgent string, requestParams url.Values) string { tenantID := "(unknown)" if tenantIDs, err := tenant.TenantIDs(request.Context()); err == nil { diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 0b3ddaaae1d..b706468a54c 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -286,7 +286,7 @@ func TestHandler_ServeHTTP(t *testing.T) { expectedReadConsistency: "", }, { - name: "handler with stats enabled, check ServiceTotalBytesProcessed header", + name: "handler with stats enabled, check ServiceTimingHeader", cfg: HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 1024}, request: func() *http.Request { req := httptest.NewRequest(http.MethodPost, "/api/v1/query", strings.NewReader("query=some_metric&time=42")) @@ -303,7 +303,7 @@ func TestHandler_ServeHTTP(t *testing.T) { expectedActivity: "user:12345 UA: req:POST /api/v1/query query=some_metric&time=42", expectedReadConsistency: "", assertHeaders: func(t *testing.T, headers http.Header) { - assert.Equal(t, "0", headers.Get(ServiceTotalBytesProcessed)) + assert.Contains(t, headers.Get(ServiceTimingHeaderName), "bytes_processed=0") }, }, } { From 0fbd0e68e4ff68fc3e8ad834f1a0c6b0ff46ff7b Mon Sep 17 00:00:00 2001 From: madhu-reddy-peram Date: Wed, 16 Oct 2024 20:11:23 +0100 Subject: [PATCH 5/5] Fix failing integration tests --- integration/query_frontend_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index d088d6dbea2..c5ba9a0a2ed 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -383,7 +383,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { if userID == 0 && cfg.queryStatsEnabled { res, _, err := c.QueryRaw("{instance=~\"hello.*\"}") require.NoError(t, err) - require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0]) + require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*, bytes_processed=[0-9.]*$", res.Header.Values("Server-Timing")[0]) } // Beyond the range of -querier.query-ingesters-within should return nothing. No need to repeat it for each user.