From 4658643059e2327cbc9381c25b001dce0da5f6a5 Mon Sep 17 00:00:00 2001 From: Franco Posa Date: Wed, 28 Aug 2024 12:03:22 -0700 Subject: [PATCH] query scheduler: querier-worker queue priority algo implementation (#9013) Example results of benchmark test `TestMultiDimensionalQueueAlgorithmSlowConsumerEffects`: ``` Results by query component: tree: tenant-querier -> query component round-robin tree, 1 tenant, 10pct slow queries: seconds in queue: [ingester: mean: 0.0499 stddev: 0.03 store-gateway: mean: 0.0029 stddev: 0.00] tree: query component round-robin -> tenant-querier tree, 1 tenant, 10pct slow queries: seconds in queue: [ingester: mean: 0.0960 stddev: 0.03 store-gateway: mean: 0.0017 stddev: 0.00] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 10pct slow queries: seconds in queue: [ingester: mean: 0.0162 stddev: 0.01 store-gateway: mean: 0.0098 stddev: 0.01] tree: tenant-querier -> query component round-robin tree, 1 tenant, 25pct slow queries: seconds in queue: [ingester: mean: 0.1912 stddev: 0.06 store-gateway: mean: 0.0806 stddev: 0.07] tree: query component round-robin -> tenant-querier tree, 1 tenant, 25pct slow queries: seconds in queue: [ingester: mean: 0.1942 stddev: 0.06 store-gateway: mean: 0.0894 stddev: 0.08] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 25pct slow queries: seconds in queue: [ingester: mean: 0.0132 stddev: 0.01 store-gateway: mean: 0.1086 stddev: 0.08] tree: tenant-querier -> query component round-robin tree, 1 tenant, 50pct slow queries: seconds in queue: [ingester: mean: 0.3635 stddev: 0.18 store-gateway: mean: 0.2155 stddev: 0.15] tree: query component round-robin -> tenant-querier tree, 1 tenant, 50pct slow queries: seconds in queue: [ingester: mean: 0.3618 stddev: 0.18 store-gateway: mean: 0.2165 stddev: 0.15] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 50pct slow queries: seconds in queue: [ingester: mean: 0.0096 stddev: 0.00 store-gateway: mean: 0.2264 stddev: 0.15] tree: tenant-querier -> query component round-robin tree, 1 tenant, 75pct slow queries: seconds in queue: [ingester: mean: 0.2160 stddev: 0.15 store-gateway: mean: 0.3468 stddev: 0.23] tree: query component round-robin -> tenant-querier tree, 1 tenant, 75pct slow queries: seconds in queue: [ingester: mean: 0.1921 stddev: 0.14 store-gateway: mean: 0.3580 stddev: 0.23] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 75pct slow queries: seconds in queue: [ingester: mean: 0.0049 stddev: 0.00 store-gateway: mean: 0.3402 stddev: 0.22] tree: tenant-querier -> query component round-robin tree, 1 tenant, 90pct slow queries: seconds in queue: [ingester: mean: 0.0528 stddev: 0.05 store-gateway: mean: 0.4271 stddev: 0.27] tree: query component round-robin -> tenant-querier tree, 1 tenant, 90pct slow queries: seconds in queue: [ingester: mean: 0.0601 stddev: 0.06 store-gateway: mean: 0.4246 stddev: 0.27] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 90pct slow queries: seconds in queue: [ingester: mean: 0.0022 stddev: 0.00 store-gateway: mean: 0.4241 stddev: 0.27] tree: tenant-querier -> query component round-robin tree, 2 tenants, first with 10pct slow queries, second with 90pct slow queries: seconds in queue: [ingester: mean: 0.2189 stddev: 0.15 store-gateway: mean: 0.2193 stddev: 0.15] tree: query component round-robin -> tenant-querier tree, 2 tenants, first with 10pct slow queries, second with 90pct slow queries: seconds in queue: [ingester: mean: 0.3614 stddev: 0.18 store-gateway: mean: 0.2177 stddev: 0.15] tree: worker-queue prioritization -> tenant-querier tree, 2 tenants, first with 10pct slow queries, second with 90pct slow queries: seconds in queue: [ingester: mean: 0.0094 stddev: 0.00 store-gateway: mean: 0.2237 stddev: 0.15] tree: tenant-querier -> query component round-robin tree, 2 tenants, first with 25pct slow queries, second with 75pct slow queries: seconds in queue: [ingester: mean: 0.2933 stddev: 0.14 store-gateway: mean: 0.2035 stddev: 0.14] tree: query component round-robin -> tenant-querier tree, 2 tenants, first with 25pct slow queries, second with 75pct slow queries: seconds in queue: [ingester: mean: 0.3626 stddev: 0.18 store-gateway: mean: 0.2141 stddev: 0.15] tree: worker-queue prioritization -> tenant-querier tree, 2 tenants, first with 25pct slow queries, second with 75pct slow queries: seconds in queue: [ingester: mean: 0.0088 stddev: 0.00 store-gateway: mean: 0.2286 stddev: 0.16] tree: tenant-querier -> query component round-robin tree, 2 tenants, first with 50pct slow queries, second with 50pct slow queries: seconds in queue: [ingester: mean: 0.3601 stddev: 0.18 store-gateway: mean: 0.2202 stddev: 0.15] tree: query component round-robin -> tenant-querier tree, 2 tenants, first with 50pct slow queries, second with 50pct slow queries: seconds in queue: [ingester: mean: 0.3633 stddev: 0.18 store-gateway: mean: 0.2152 stddev: 0.15] tree: worker-queue prioritization -> tenant-querier tree, 2 tenants, first with 50pct slow queries, second with 50pct slow queries: seconds in queue: [ingester: mean: 0.0093 stddev: 0.00 store-gateway: mean: 0.2276 stddev: 0.16] Results for ingester-only queries by tenant ID: tree: tenant-querier -> query component round-robin tree, 1 tenant, 10pct slow queries: seconds in queue:[tenant-0: mean: 0.0499 stddev: 0.03] tree: query component round-robin -> tenant-querier tree, 1 tenant, 10pct slow queries: seconds in queue:[tenant-0: mean: 0.0960 stddev: 0.03] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 10pct slow queries: seconds in queue:[tenant-0: mean: 0.0162 stddev: 0.01] tree: tenant-querier -> query component round-robin tree, 1 tenant, 25pct slow queries: seconds in queue:[tenant-0: mean: 0.1912 stddev: 0.06] tree: query component round-robin -> tenant-querier tree, 1 tenant, 25pct slow queries: seconds in queue:[tenant-0: mean: 0.1942 stddev: 0.06] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 25pct slow queries: seconds in queue:[tenant-0: mean: 0.0132 stddev: 0.01] tree: tenant-querier -> query component round-robin tree, 1 tenant, 50pct slow queries: seconds in queue:[tenant-0: mean: 0.3635 stddev: 0.18] tree: query component round-robin -> tenant-querier tree, 1 tenant, 50pct slow queries: seconds in queue:[tenant-0: mean: 0.3618 stddev: 0.18] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 50pct slow queries: seconds in queue:[tenant-0: mean: 0.0096 stddev: 0.00] tree: tenant-querier -> query component round-robin tree, 1 tenant, 75pct slow queries: seconds in queue:[tenant-0: mean: 0.2160 stddev: 0.15] tree: query component round-robin -> tenant-querier tree, 1 tenant, 75pct slow queries: seconds in queue:[tenant-0: mean: 0.1921 stddev: 0.14] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 75pct slow queries: seconds in queue:[tenant-0: mean: 0.0049 stddev: 0.00] tree: tenant-querier -> query component round-robin tree, 1 tenant, 90pct slow queries: seconds in queue:[tenant-0: mean: 0.0528 stddev: 0.05] tree: query component round-robin -> tenant-querier tree, 1 tenant, 90pct slow queries: seconds in queue:[tenant-0: mean: 0.0601 stddev: 0.06] tree: worker-queue prioritization -> tenant-querier tree, 1 tenant, 90pct slow queries: seconds in queue:[tenant-0: mean: 0.0022 stddev: 0.00] tree: tenant-querier -> query component round-robin tree, 2 tenants, first with 10pct slow queries, second with 90pct slow queries: seconds in queue:[tenant-0: mean: 0.2355 stddev: 0.15 tenant-1: mean: 0.0483 stddev: 0.05] tree: query component round-robin -> tenant-querier tree, 2 tenants, first with 10pct slow queries, second with 90pct slow queries: seconds in queue:[tenant-0: mean: 0.3894 stddev: 0.17 tenant-1: mean: 0.0924 stddev: 0.06] tree: worker-queue prioritization -> tenant-querier tree, 2 tenants, first with 10pct slow queries, second with 90pct slow queries: seconds in queue:[tenant-0: mean: 0.0101 stddev: 0.00 tenant-1: mean: 0.0031 stddev: 0.00] tree: tenant-querier -> query component round-robin tree, 2 tenants, first with 25pct slow queries, second with 75pct slow queries: seconds in queue:[tenant-0: mean: 0.3322 stddev: 0.11 tenant-1: mean: 0.1902 stddev: 0.13] tree: query component round-robin -> tenant-querier tree, 2 tenants, first with 25pct slow queries, second with 75pct slow queries: seconds in queue:[tenant-0: mean: 0.4159 stddev: 0.15 tenant-1: mean: 0.1980 stddev: 0.17] tree: worker-queue prioritization -> tenant-querier tree, 2 tenants, first with 25pct slow queries, second with 75pct slow queries: seconds in queue:[tenant-0: mean: 0.0100 stddev: 0.00 tenant-1: mean: 0.0049 stddev: 0.00] tree: tenant-querier -> query component round-robin tree, 2 tenants, first with 50pct slow queries, second with 50pct slow queries: seconds in queue:[tenant-0: mean: 0.3376 stddev: 0.19 tenant-1: mean: 0.3814 stddev: 0.17] tree: query component round-robin -> tenant-querier tree, 2 tenants, first with 50pct slow queries, second with 50pct slow queries: seconds in queue:[tenant-0: mean: 0.3538 stddev: 0.19 tenant-1: mean: 0.3730 stddev: 0.17] tree: worker-queue prioritization -> tenant-querier tree, 2 tenants, first with 50pct slow queries, second with 50pct slow queries: seconds in queue:[tenant-0: mean: 0.0090 stddev: 0.00 tenant-1: mean: 0.0097 stddev: 0.00] ``` --- pkg/frontend/v1/frontend.go | 9 +- ...ing_algorithm_tree_queue_benchmark_test.go | 499 ++++++++++++++++++ .../queue/query_component_utilization.go | 84 --- .../queue/query_component_utilization_test.go | 274 ---------- pkg/scheduler/queue/queue.go | 191 +++---- pkg/scheduler/queue/queue_test.go | 349 +++++------- pkg/scheduler/queue/tenant_queues.go | 2 +- ...ueue_algo_querier_worker_queue_priority.go | 161 ++++++ ...algo_querier_worker_queue_priority_test.go | 345 ++++++++++++ pkg/scheduler/scheduler.go | 11 +- 10 files changed, 1252 insertions(+), 673 deletions(-) create mode 100644 pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go delete mode 100644 pkg/scheduler/queue/query_component_utilization_test.go create mode 100644 pkg/scheduler/queue/tree_queue_algo_querier_worker_queue_priority.go create mode 100644 pkg/scheduler/queue/tree_queue_algo_querier_worker_queue_priority_test.go diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 66b7c8761b5..7e0b52575f1 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -239,14 +239,15 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { } defer f.requestQueue.SubmitUnregisterQuerierWorkerConn(querierWorkerConn) - lastTenantIndex := queue.FirstTenant() + lastTenantIdx := queue.FirstTenant() for { - reqWrapper, idx, err := f.requestQueue.WaitForRequestForQuerier(server.Context(), lastTenantIndex, querierID) + dequeueReq := queue.NewQuerierWorkerDequeueRequest(querierWorkerConn, lastTenantIdx) + reqWrapper, idx, err := f.requestQueue.AwaitRequestForQuerier(dequeueReq) if err != nil { return err } - lastTenantIndex = idx + lastTenantIdx = idx req := reqWrapper.(*request) @@ -266,7 +267,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { it's possible that its own queue would perpetually contain only expired requests. */ if req.originalCtx.Err() != nil { - lastTenantIndex = lastTenantIndex.ReuseLastTenant() + lastTenantIdx = lastTenantIdx.ReuseLastTenant() continue } diff --git a/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go b/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go new file mode 100644 index 00000000000..95e7ea8a32a --- /dev/null +++ b/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go @@ -0,0 +1,499 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package queue + +import ( + "context" + "fmt" + "math" + "math/rand" + "slices" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const querierForgetDelay = 0 +const maxOutStandingPerTenant = 1000000 + +func weightedRandAdditionalQueueDimension(dimensionWeights map[string]float64) string { + totalWeight := float64(0) + for _, dimensionWeight := range dimensionWeights { + totalWeight += dimensionWeight + } + roundTotalWeight := math.Round(totalWeight*10) / 10 + + if roundTotalWeight != 1.0 { + panic("dimension weights must sum to 1.0") + } + + randInt := rand.Float64() + + sum := float64(0) + for dimension, dimensionWeight := range dimensionWeights { + sum += dimensionWeight + if randInt < sum { + return dimension + } + } + + panic("no dimension selected") +} + +func makeWeightedRandAdditionalQueueDimensionFunc( + tenantDimensionWeights map[string]map[string]float64, +) func(tenantID string) []string { + return func(tenantID string) []string { + return []string{weightedRandAdditionalQueueDimension(tenantDimensionWeights[tenantID])} + } +} + +const slowConsumerQueueDimension = storeGatewayQueueDimension + +func makeQueueConsumeFuncWithSlowQueryComponent( + queue *RequestQueue, + slowConsumerLatency time.Duration, + normalConsumerLatency time.Duration, + report *testScenarioQueueDurationObservations, +) consumeRequest { + return func(request QueryRequest) error { + schedulerRequest := request.(*SchedulerRequest) + queryComponent := schedulerRequest.ExpectedQueryComponentName() + if queryComponent == ingesterAndStoreGatewayQueueDimension { + // we expect the latency of a query hitting both a normal and a slowed-down query component + // will be constrained by the latency of the slowest query component; + // we enqueued with the "both" query component to observe the queue algorithm behavior + // but we re-assign query component here for simplicity in logic & reporting + queryComponent = storeGatewayQueueDimension + } + report.Observe(schedulerRequest.UserID, queryComponent, time.Since(schedulerRequest.EnqueueTime).Seconds()) + + queue.QueryComponentUtilization.MarkRequestSent(schedulerRequest) + if queryComponent == slowConsumerQueueDimension { + time.Sleep(slowConsumerLatency) + } else { + time.Sleep(normalConsumerLatency) + } + + queue.QueryComponentUtilization.MarkRequestCompleted(schedulerRequest) + return nil + } +} + +type testScenarioQueueDurationObservations struct { + testCaseName string + mu sync.Mutex + tenantIDQueueDurationObservations map[string][]float64 + queryComponentQueueDurationObservations map[string][]float64 +} + +func (o *testScenarioQueueDurationObservations) Observe(tenantID, queryComponent string, queueDuration float64) { + o.mu.Lock() + defer o.mu.Unlock() + + o.queryComponentQueueDurationObservations[queryComponent] = append( + o.queryComponentQueueDurationObservations[queryComponent], queueDuration, + ) + + if queryComponent != slowConsumerQueueDimension { + // when analyzing the statistics for the tenant-specific experience of queue duration, + // we only care about the queue duration for the non-slowed-down query component, + // as in the real-world scenario, queries to a severely slowed-down query component are expected + // to be a lost cause, largely timing out before they can be fully serviced. + o.tenantIDQueueDurationObservations[tenantID] = append( + o.tenantIDQueueDurationObservations[tenantID], queueDuration, + ) + } + +} + +func (o *testScenarioQueueDurationObservations) Report() *testScenarioQueueDurationReport { + var tenantIDs []string + for tenantID := range o.tenantIDQueueDurationObservations { + tenantIDs = append(tenantIDs, tenantID) + } + slices.Sort(tenantIDs) + var queryComponents []string + for queryComponent := range o.queryComponentQueueDurationObservations { + queryComponents = append(queryComponents, queryComponent) + } + slices.Sort(queryComponents) + + report := &testScenarioQueueDurationReport{ + testCaseName: o.testCaseName, + } + + for _, queryComponent := range queryComponents { + meanDur := mean(o.queryComponentQueueDurationObservations[queryComponent]) + stdDevDur := stddev(o.queryComponentQueueDurationObservations[queryComponent], meanDur) + report.queryComponentReports = append( + report.queryComponentReports, + testDimensionReport{ + dimensionName: queryComponent, + meanSecondsInQueue: meanDur, + stdDevSecondsInQueue: stdDevDur, + }, + ) + } + + for _, tenantID := range tenantIDs { + meanDur := mean(o.tenantIDQueueDurationObservations[tenantID]) + stdDevDur := stddev(o.tenantIDQueueDurationObservations[tenantID], meanDur) + report.tenantIDReports = append( + report.tenantIDReports, + testDimensionReport{ + dimensionName: "tenant-" + tenantID, + meanSecondsInQueue: meanDur, + stdDevSecondsInQueue: stdDevDur, + }, + ) + } + + return report +} + +type testScenarioQueueDurationReport struct { + testCaseName string + queryComponentReports []testDimensionReport + tenantIDReports []testDimensionReport +} + +func (r *testScenarioQueueDurationReport) QueryComponentReportString() string { + var queryComponentReports []string + for _, queryComponentReport := range r.queryComponentReports { + queryComponentReports = append( + queryComponentReports, + fmt.Sprintf( + "%s: mean: %.4f stddev: %.2f", + queryComponentReport.dimensionName, + queryComponentReport.meanSecondsInQueue, + queryComponentReport.stdDevSecondsInQueue), + ) + } + return fmt.Sprintf( + "seconds in queue: %v", queryComponentReports, + ) +} + +func (r *testScenarioQueueDurationReport) TenantIDReportString() string { + var tenantIDReports []string + for _, tenantIDReport := range r.tenantIDReports { + tenantIDReports = append( + tenantIDReports, + fmt.Sprintf( + "%s: mean: %.4f stddev: %.2f", + tenantIDReport.dimensionName, + tenantIDReport.meanSecondsInQueue, + tenantIDReport.stdDevSecondsInQueue), + ) + } + return fmt.Sprintf( + "seconds in queue:%v", tenantIDReports, + ) +} + +type testDimensionReport struct { + dimensionName string + meanSecondsInQueue float64 + stdDevSecondsInQueue float64 +} + +func mean(numbers []float64) float64 { + sum := 0.0 + for _, number := range numbers { + sum += number + } + return sum / float64(len(numbers)) +} + +func stddev(numbers []float64, mean float64) float64 { + sumOfSquares := 0.0 + for _, number := range numbers { + sumOfSquares += math.Pow(number-mean, 2) + } + meanOfSquares := sumOfSquares / float64(len(numbers)) + return math.Sqrt(meanOfSquares) +} + +// TestMultiDimensionalQueueFairnessSlowConsumerEffects emulates a simplified queue slowdown scenario +// which the scheduler's additional queue dimensions features are intended to solve for. +// +// In this scenario, one category of queue item causes the queue consumer to slow down, introducing a +// significant delay while the queue consumer processes it and before the consumer can dequeue the next item. +// This emulates a situation where one of the query components - the ingesters or store-gateways - is under load. +// +// In a single-queue or single-queue-per-tenant scenario, the slow query component will cause the queue to back up. +// When queue items belonging to the slow category are in the same queue in front of the normal queue items, +// the normal queue items must wait for all slow queue items to be cleared before they can be serviced. +// In this way, the degraded performance of the slow query component equally degrades the performance of the +// queries which *could* be serviced quickly, but are waiting behind the slow queries in the queue. +// +// In this benchmark, queries for the store-gateway query component are artificially slowed down. +// We assume that in such a slowdown scenario that queries hitting both query components (ingester and store-gateway) +// will be constrained by the latency of the slowest query component, so we treat them as store-gateway queries. +// The goal of the queueing algorithms is to enable ingester-only queries to continue to be serviced quickly +// when the store-gateway queries are slow; we are not concerned with de-loading the store-gateways +// or otherwise improving the performance for the store-gateway queries. +// Benchmark results should be interpreted primarily in terms of the performance of the ingester-only queries. +func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) { + + weightedQueueDimensionTestCases := []struct { + name string + tenantQueueDimensionsWeights map[string]map[string]float64 + }{ + // single-tenant scenarios + { + name: "1 tenant, 10pct slow queries", + tenantQueueDimensionsWeights: map[string]map[string]float64{ + "0": { + ingesterQueueDimension: .90, + storeGatewayQueueDimension: .05, + ingesterAndStoreGatewayQueueDimension: .05, + }, + }, + }, + { + name: "1 tenant, 25pct slow queries", + tenantQueueDimensionsWeights: map[string]map[string]float64{ + "0": { + ingesterQueueDimension: .75, + storeGatewayQueueDimension: .125, + ingesterAndStoreGatewayQueueDimension: .125, + }, + }, + }, + { + name: "1 tenant, 50pct slow queries", + tenantQueueDimensionsWeights: map[string]map[string]float64{ + "0": { + ingesterQueueDimension: .50, + storeGatewayQueueDimension: .25, + ingesterAndStoreGatewayQueueDimension: .25, + }, + }, + }, + { + name: "1 tenant, 75pct slow queries", + tenantQueueDimensionsWeights: map[string]map[string]float64{ + "0": { + ingesterQueueDimension: .25, + storeGatewayQueueDimension: .375, + ingesterAndStoreGatewayQueueDimension: .375, + }, + }, + }, + { + name: "1 tenant, 90pct slow queries", + tenantQueueDimensionsWeights: map[string]map[string]float64{ + "0": { + ingesterQueueDimension: .10, + storeGatewayQueueDimension: .45, + ingesterAndStoreGatewayQueueDimension: .45, + }, + }, + }, + + // multi-tenant scenarios + { + name: "2 tenants, first with 10pct slow queries, second with 90pct slow queries", + tenantQueueDimensionsWeights: map[string]map[string]float64{ + "0": { + ingesterQueueDimension: .90, + storeGatewayQueueDimension: .05, + ingesterAndStoreGatewayQueueDimension: .05, + }, + "1": { + ingesterQueueDimension: .10, + storeGatewayQueueDimension: .45, + ingesterAndStoreGatewayQueueDimension: .45, + }, + }, + }, + { + name: "2 tenants, first with 25pct slow queries, second with 75pct slow queries", + tenantQueueDimensionsWeights: map[string]map[string]float64{ + "0": { + ingesterQueueDimension: .75, + storeGatewayQueueDimension: .125, + ingesterAndStoreGatewayQueueDimension: .125, + }, + "1": { + ingesterQueueDimension: .25, + storeGatewayQueueDimension: .375, + ingesterAndStoreGatewayQueueDimension: .375, + }, + }, + }, + { + name: "2 tenants, first with 50pct slow queries, second with 50pct slow queries", + tenantQueueDimensionsWeights: map[string]map[string]float64{ + "0": { + ingesterQueueDimension: .50, + storeGatewayQueueDimension: .25, + ingesterAndStoreGatewayQueueDimension: .25, + }, + "1": { + ingesterQueueDimension: .50, + storeGatewayQueueDimension: .25, + ingesterAndStoreGatewayQueueDimension: .25, + }, + }, + }, + } + + maxQueriersPerTenant := 0 // disable shuffle sharding + + // Increase totalRequests to tighten up variations when running locally, but do not commit higher values; + // the later test cases with a higher percentage of slow queries will take a long time to run. + totalRequests := 1000 + numProducers := 10 + numConsumers := 8 + // Number of workers must be divisible by the number of query component queue types + // for even distribution of workers to queues and fair comparison between algorithms. + // Query component queue types can up to 4: ingester, store-gateway, ingester-and-store-gateway, and unknown. + // The unknown queue type is not used in this test, but 12 ensures divisibility by 4, 3, 2, and 1. + numWorkersPerConsumer := 12 + + normalConsumerLatency := 1 * time.Millisecond + // slow request approximately 100x longer than the fast request seems fair; + // an ingester can respond in 0.3 seconds while a slow store-gateway query can take 30 seconds + slowConsumerLatency := 100 * time.Millisecond + + var testCaseNames []string + testCaseReports := map[string]*testScenarioQueueDurationReport{} + + for _, weightedQueueDimensionTestCase := range weightedQueueDimensionTestCases { + numTenants := len(weightedQueueDimensionTestCase.tenantQueueDimensionsWeights) + + tqa := newTenantQuerierAssignments(0) + + nonFlippedRoundRobinTree, err := NewTree(tqa, &roundRobinState{}, &roundRobinState{}) + require.NoError(t, err) + + flippedRoundRobinTree, err := NewTree(&roundRobinState{}, tqa, &roundRobinState{}) + require.NoError(t, err) + + querierWorkerPrioritizationTree, err := NewTree(NewQuerierWorkerQueuePriorityAlgo(), tqa, &roundRobinState{}) + require.NoError(t, err) + + trees := []struct { + name string + tree Tree + }{ + // keeping these names the same length keeps logged results aligned + { + "tenant-querier -> query component round-robin tree", + nonFlippedRoundRobinTree, + }, + { + "query component round-robin -> tenant-querier tree", + flippedRoundRobinTree, + }, + { + "worker-queue prioritization -> tenant-querier tree", + querierWorkerPrioritizationTree, + }, + } + for _, tree := range trees { + testCaseName := fmt.Sprintf( + "tree: %s, %s", + tree.name, + weightedQueueDimensionTestCase.name, + ) + testCaseObservations := &testScenarioQueueDurationObservations{ + testCaseName: testCaseName, + tenantIDQueueDurationObservations: map[string][]float64{}, + queryComponentQueueDurationObservations: map[string][]float64{}, + } + + // only the non-flipped tree uses the old tenant -> query component hierarchy + prioritizeQueryComponents := tree.tree != nonFlippedRoundRobinTree + + t.Run(testCaseName, func(t *testing.T) { + queue, err := NewRequestQueue( + log.NewNopLogger(), + maxOutStandingPerTenant, + true, + prioritizeQueryComponents, + querierForgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + ) + require.NoError(t, err) + + // NewRequestQueue constructor does not allow passing in a tree or tenantQuerierAssignments + // so we have to override here to use the same structures as the test case + queue.queueBroker.tenantQuerierAssignments = tqa + queue.queueBroker.prioritizeQueryComponents = prioritizeQueryComponents + queue.queueBroker.tree = tree.tree + + ctx := context.Background() + require.NoError(t, queue.starting(ctx)) + t.Cleanup(func() { + require.NoError(t, queue.stop(nil)) + }) + + // configure queue producers to enqueue requests with the query component + // randomly assigned according to the distribution defined in the test case + queueDimensionFunc := makeWeightedRandAdditionalQueueDimensionFunc( + weightedQueueDimensionTestCase.tenantQueueDimensionsWeights, + ) + producersChan, producersErrGroup := makeQueueProducerGroup( + queue, maxQueriersPerTenant, totalRequests, numProducers, numTenants, queueDimensionFunc, + ) + + // configure queue consumers with respective latencies for processing requests + // which were assigned the "normal" or "slow" query component + consumeFunc := makeQueueConsumeFuncWithSlowQueryComponent( + queue, slowConsumerLatency, normalConsumerLatency, testCaseObservations, + ) + queueConsumerErrGroup, startConsumersChan := makeQueueConsumerGroup( + context.Background(), queue, totalRequests, numConsumers, numWorkersPerConsumer, consumeFunc, + ) + + // run queue consumers and producers and wait for completion + + // run producers to fill queue + close(producersChan) + err = producersErrGroup.Wait() + require.NoError(t, err) + + // run consumers to until queue is empty + close(startConsumersChan) + err = queueConsumerErrGroup.Wait() + require.NoError(t, err) + + report := testCaseObservations.Report() + t.Logf("%s: %s", testCaseName, report.QueryComponentReportString()) + t.Logf("%s: %s", testCaseName, report.TenantIDReportString()) + // collect results in order + testCaseNames = append(testCaseNames, testCaseName) + testCaseReports[testCaseName] = report + + // ensure everything was dequeued + path, val := tree.tree.Dequeue() + assert.Nil(t, val) + assert.Equal(t, path, QueuePath{}) + }) + } + } + + t.Log("Results by query component:") + for _, testCaseName := range testCaseNames { + t.Logf("%s: %s", testCaseName, testCaseReports[testCaseName].QueryComponentReportString()) + } + + t.Log("Results for ingester-only queries by tenant ID:") + for _, testCaseName := range testCaseNames { + t.Logf("%s: %s", testCaseName, testCaseReports[testCaseName].TenantIDReportString()) + } + +} diff --git a/pkg/scheduler/queue/query_component_utilization.go b/pkg/scheduler/queue/query_component_utilization.go index 320243c1878..a7f66e7c3e6 100644 --- a/pkg/scheduler/queue/query_component_utilization.go +++ b/pkg/scheduler/queue/query_component_utilization.go @@ -3,10 +3,8 @@ package queue import ( - "math" "sync" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -49,10 +47,6 @@ func queryComponentFlags(queryComponentName string) (isIngester, isStoreGateway // Query requests utilizing both ingesters and store-gateways are tracked in the atomics for both component, // therefore the sum of inflight requests by component is likely to exceed the inflight requests total. type QueryComponentUtilization struct { - // targetReservedCapacity sets the portion of querier-worker connections we attempt to reserve - // for queries to the less-loaded query component when the query queue becomes backlogged. - targetReservedCapacity float64 - inflightRequestsMu sync.RWMutex // inflightRequests tracks requests from the time the request was successfully sent to a querier // to the time the request was completed by the querier or failed due to cancel, timeout, or disconnect. @@ -64,30 +58,11 @@ type QueryComponentUtilization struct { querierInflightRequestsMetric *prometheus.SummaryVec } -// DefaultReservedQueryComponentCapacity reserves 1 / 3 of querier-worker connections -// for the query component utilizing fewer of the available connections. -// Chosen to represent an even balance between the three possible combinations of query components: -// ingesters only, store-gateways only, or both ingesters and store-gateways. -const DefaultReservedQueryComponentCapacity = 0.33 - -// MaxReservedQueryComponentCapacity is an exclusive upper bound on the targetReservedCapacity. -// The threshold for a QueryComponent's utilization of querier-worker connections -// can only be exceeded by one QueryComponent at a time as long as targetReservedCapacity is < 0.5. -// Therefore, one of the components will always be given the OK to dequeue queries for. -const MaxReservedQueryComponentCapacity = 0.5 - func NewQueryComponentUtilization( - targetReservedCapacity float64, querierInflightRequestsMetric *prometheus.SummaryVec, ) (*QueryComponentUtilization, error) { - if targetReservedCapacity >= MaxReservedQueryComponentCapacity { - return nil, errors.New("invalid targetReservedCapacity") - } - return &QueryComponentUtilization{ - targetReservedCapacity: targetReservedCapacity, - inflightRequests: map[RequestKey]*SchedulerRequest{}, ingesterInflightRequests: 0, storeGatewayInflightRequests: 0, @@ -97,65 +72,6 @@ func NewQueryComponentUtilization( }, nil } -// ExceedsThresholdForComponentName checks whether a query component has exceeded the capacity utilization threshold. -// This enables the dequeuing algorithm to skip requests for a query component experiencing heavy load, -// reserving querier-worker connection capacity to continue servicing requests for the other component. -// If there are no requests in queue for the other, less-utilized component, the dequeuing algorithm may choose -// to continue servicing requests for the component which has exceeded the reserved capacity threshold. -// -// Capacity utilization for a QueryComponent is defined by the portion of the querier-worker connections -// which are currently in flight processing a query which requires that QueryComponent. -// -// The component name can indicate the usage of one or both of ingesters and store-gateways. -// If both ingesters and store-gateways will be used, this method will flag the threshold as exceeded -// if either ingesters or store-gateways are currently in excess of the reserved capacity. -// -// Capacity reservation only occurs when the queue backlogged, where backlogged is defined as -// (length of the query queue) >= (number of querier-worker connections waiting for a query). -// -// A QueryComponent's utilization is allowed to exceed the reserved capacity when the queue is not backlogged. -// If an influx of queries then creates a backlog, this method will indicate to skip queries for the component. -// As the inflight queries complete or fail, the component's utilization will naturally decrease. -// This method will continue to indicate to skip queries for the component until it is back under the threshold. -func (qcl *QueryComponentUtilization) ExceedsThresholdForComponentName( - name string, connectedWorkers, queueLen, waitingWorkers int, -) (bool, QueryComponent) { - if waitingWorkers > queueLen { - // excess querier-worker capacity; no need to reserve any for now - return false, "" - } - if connectedWorkers <= 1 { - // corner case; cannot reserve capacity with only one worker available - return false, "" - } - - // allow the functionality to be turned off via setting targetReservedCapacity to 0 - minReservedConnections := 0 - if qcl.targetReservedCapacity > 0 { - // reserve at least one connection in case (connected workers) * (reserved capacity) is less than one - minReservedConnections = int( - math.Ceil( - math.Max(qcl.targetReservedCapacity*float64(connectedWorkers), 1), - ), - ) - } - - isIngester, isStoreGateway := queryComponentFlags(name) - qcl.inflightRequestsMu.RLock() - defer qcl.inflightRequestsMu.RUnlock() - if isIngester { - if connectedWorkers-(qcl.ingesterInflightRequests) <= minReservedConnections { - return true, Ingester - } - } - if isStoreGateway { - if connectedWorkers-(qcl.storeGatewayInflightRequests) <= minReservedConnections { - return true, StoreGateway - } - } - return false, "" -} - // MarkRequestSent is called when a request is sent to a querier func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest) { if req != nil { diff --git a/pkg/scheduler/queue/query_component_utilization_test.go b/pkg/scheduler/queue/query_component_utilization_test.go deleted file mode 100644 index 44bfb2ec82f..00000000000 --- a/pkg/scheduler/queue/query_component_utilization_test.go +++ /dev/null @@ -1,274 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package queue - -import ( - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/stretchr/testify/require" -) - -func testQuerierInflightRequestsGauge() *prometheus.SummaryVec { - return promauto.With(prometheus.NewPedanticRegistry()).NewSummaryVec( - prometheus.SummaryOpts{ - Name: "test_cortex_query_scheduler_querier_inflight_requests", - Help: "[test] Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.", - Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, - MaxAge: time.Minute, - AgeBuckets: 6, - }, - []string{"query_component"}, - ) -} - -func TestExceedsUtilizationThresholdForQueryComponents(t *testing.T) { - connectedWorkers := int64(10) - // with 10 connected workers, if queue len >= waiting workers, - // no component can have more than 6 inflight requests. - testReservedCapacity := 0.4 - - testCases := []struct { - name string - - queueLen int - waitingWorkers int - ingesterInflightRequests int - storeGatewayInflightRequests int - - queryComponentName string - thresholdExceededComponent QueryComponent - }{ - // No queue backlog, threshold met or exceeded cases: we do not signal to skip the request. - // In these cases, one component's inflight requests meet or exceed the utilization threshold, - // but there are more waiting workers than items in the queue. - { - name: "ingester only, no backlog, ingester utilization above threshold", - queueLen: 1, - waitingWorkers: 2, - ingesterInflightRequests: 8, - storeGatewayInflightRequests: 0, - queryComponentName: ingesterQueueDimension, - thresholdExceededComponent: "", - }, - { - name: "store-gateway only, no backlog, store-gateway utilization above threshold", - queueLen: 1, - waitingWorkers: 2, - ingesterInflightRequests: 0, - storeGatewayInflightRequests: 8, - queryComponentName: storeGatewayQueueDimension, - thresholdExceededComponent: "", - }, - { - name: "ingester and store-gateway, no backlog, ingester utilization above threshold", - queueLen: 2, - waitingWorkers: 3, - ingesterInflightRequests: 7, - storeGatewayInflightRequests: 0, - queryComponentName: ingesterAndStoreGatewayQueueDimension, - thresholdExceededComponent: "", - }, - { - name: "ingester and store-gateway, no backlog, store gateway utilization above threshold", - queueLen: 2, - waitingWorkers: 3, - ingesterInflightRequests: 0, - storeGatewayInflightRequests: 7, - queryComponentName: ingesterAndStoreGatewayQueueDimension, - thresholdExceededComponent: "", - }, - { - name: "uncategorized queue component, no backlog, ingester utilization above threshold", - queueLen: 3, - waitingWorkers: 4, - ingesterInflightRequests: 6, - storeGatewayInflightRequests: 0, - queryComponentName: "abc", - thresholdExceededComponent: "", - }, - { - name: "uncategorized queue component, no backlog, store gateway utilization above threshold", - queueLen: 3, - waitingWorkers: 4, - ingesterInflightRequests: 0, - storeGatewayInflightRequests: 6, - queryComponentName: "xyz", - thresholdExceededComponent: "", - }, - - // Queue backlog, threshold not exceeded cases: we do not signal to skip the request. - // In these cases, there are more items in the queue than waiting workers, - // but no component's inflight requests meet or exceed the utilization threshold. - { - name: "ingester only, queue backlog, ingester utilization below threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 5, - storeGatewayInflightRequests: 4, - queryComponentName: ingesterQueueDimension, - thresholdExceededComponent: "", - }, - { - name: "store-gateway only, queue backlog, store-gateway utilization below threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 4, - storeGatewayInflightRequests: 5, - queryComponentName: storeGatewayQueueDimension, - thresholdExceededComponent: "", - }, - { - name: "ingester and store-gateway, queue backlog, all utilization below threshold", - queueLen: 10, - waitingWorkers: 2, - ingesterInflightRequests: 4, - storeGatewayInflightRequests: 4, - queryComponentName: ingesterAndStoreGatewayQueueDimension, - thresholdExceededComponent: "", - }, - { - name: "uncategorized queue component, queue backlog, all utilization below threshold", - queueLen: 10, - waitingWorkers: 2, - ingesterInflightRequests: 4, - storeGatewayInflightRequests: 4, - queryComponentName: "abc", - thresholdExceededComponent: "", - }, - - // Queue backlog, threshold exceeded cases: we signal to skip the request. - // In these cases, there are more items in the queue than waiting workers, - // and one component's inflight requests meet or exceed the utilization threshold. - { - name: "ingester only, queue backlog, ingester utilization at threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 6, - storeGatewayInflightRequests: 3, - queryComponentName: ingesterQueueDimension, - thresholdExceededComponent: Ingester, - }, - { - name: "store-gateway only, queue backlog, store-gateway utilization at threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 3, - storeGatewayInflightRequests: 6, - queryComponentName: storeGatewayQueueDimension, - thresholdExceededComponent: StoreGateway, - }, - { - name: "ingester and store-gateway, queue backlog, ingester utilization above threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 6, - storeGatewayInflightRequests: 3, - queryComponentName: ingesterAndStoreGatewayQueueDimension, - thresholdExceededComponent: Ingester, - }, - { - name: "ingester and store-gateway, queue backlog, store gateway utilization above threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 3, - storeGatewayInflightRequests: 6, - queryComponentName: ingesterAndStoreGatewayQueueDimension, - thresholdExceededComponent: StoreGateway, - }, - { - name: "uncategorized queue component, queue backlog, ingester utilization above threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 7, - storeGatewayInflightRequests: 2, - queryComponentName: "abc", - thresholdExceededComponent: Ingester, - }, - { - name: "uncategorized queue component, queue backlog, store gateway utilization above threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 2, - storeGatewayInflightRequests: 7, - queryComponentName: "xyz", - thresholdExceededComponent: StoreGateway, - }, - - // corner cases - { - name: "uncategorized queue component, queue backlog, store gateway utilization above threshold", - queueLen: 10, - waitingWorkers: 1, - ingesterInflightRequests: 2, - storeGatewayInflightRequests: 7, - queryComponentName: "xyz", - thresholdExceededComponent: StoreGateway, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - var err error - queryComponentUtilization, err := NewQueryComponentUtilization( - testReservedCapacity, testQuerierInflightRequestsGauge(), - ) - require.NoError(t, err) - - disabledComponentUtilization, err := NewQueryComponentUtilization( - 0, testQuerierInflightRequestsGauge(), - ) - require.NoError(t, err) - - for i := 0; i < testCase.ingesterInflightRequests; i++ { - ingesterInflightRequest := &SchedulerRequest{ - FrontendAddr: "frontend-a", - QueryID: uint64(i), - AdditionalQueueDimensions: []string{ingesterQueueDimension}, - } - queryComponentUtilization.MarkRequestSent(ingesterInflightRequest) - } - - for i := 0; i < testCase.storeGatewayInflightRequests; i++ { - storeGatewayInflightRequest := &SchedulerRequest{ - FrontendAddr: "frontend-b", - QueryID: uint64(i), - AdditionalQueueDimensions: []string{storeGatewayQueueDimension}, - } - queryComponentUtilization.MarkRequestSent(storeGatewayInflightRequest) - } - - exceedsThreshold, queryComponent := queryComponentUtilization.ExceedsThresholdForComponentName( - testCase.queryComponentName, - int(connectedWorkers), - testCase.queueLen, - testCase.waitingWorkers, - ) - require.Equal(t, queryComponent, testCase.thresholdExceededComponent) - // we should only return a component when exceedsThreshold is true and vice versa - require.Equal(t, exceedsThreshold, testCase.thresholdExceededComponent != "") - - // with 1 connected worker, a component should never be marked as exceeding the threshold - exceedsThreshold, queryComponent = queryComponentUtilization.ExceedsThresholdForComponentName( - testCase.queryComponentName, - 1, - testCase.queueLen, - testCase.waitingWorkers, - ) - require.False(t, exceedsThreshold) - require.Equal(t, queryComponent, QueryComponent("")) - - // a component utilization with reserved capacity 0 disables capacity checks - exceedsThreshold, queryComponent = disabledComponentUtilization.ExceedsThresholdForComponentName( - testCase.queryComponentName, - int(connectedWorkers), - testCase.queueLen, - testCase.waitingWorkers, - ) - require.False(t, exceedsThreshold) - require.Equal(t, queryComponent, QueryComponent("")) - }) - } -} diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index f59fdfa1506..b7032f966c6 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -77,14 +77,16 @@ func (sr *SchedulerRequest) ExpectedQueryComponentName() string { return unknownQueueDimension } -// Request stored into the queue. -type Request interface{} +// QueryRequest represents the items stored in the queue +// which may be a SchedulerRequest when running with the standalone scheduler process, +// or a frontend/v1 request when running with the RequestQueue embedded in the v1 frontend. +type QueryRequest interface{} // QuerierWorkerConn is a connection from the querier-worker to the request queue. // // WorkerID is unique only per querier; querier-1 and querier-2 will both have a WorkerID=0. // WorkerID is derived internally in order to distribute worker connections across queue dimensions. -// Unregistered querier-worker connections are assigned a sentinal unregisteredWorkerID. +// Unregistered querier-worker connections are assigned a sentinel unregisteredWorkerID. // // QuerierWorkerConn is also used when passing querierWorkerOperation messages to update querier connection statuses. // The querierWorkerOperations can be specific to a querier, but not a particular worker connection (notifyShutdown), @@ -112,19 +114,20 @@ func (qwc *QuerierWorkerConn) IsRegistered() bool { // RequestQueue holds incoming requests in queues, split by multiple dimensions based on properties of the request. // Dequeuing selects the next request from an appropriate queue given the state of the system. -// Two separate system states are managed by the RequestQueue and used to select the next request: -// 1. TenantQuerierAssignments +// Two layers of QueueAlgorithms are used by the RequestQueue to select the next queue to dequeue a request from: +// +// - Tenant-Querier Assignments // Tenants with shuffle-sharding enabled by setting maxQueriers > 0 are assigned a subset of queriers. -// The RequestQueue receives waitingQuerierConn messages with QuerierIDs -// in order to dequeue requests from a tenant assigned to that querier. -// 2. QueryComponentUtilization -// Requests sent to queriers are tracked per query component until the requests are completed or failed. -// The RequestQueue will dequeue requests such that one query component does not utilize -// all querier-worker connections while requests for the other query component are waiting. +// The RequestQueue utilizes the querier assignments to only dequeue requests for a tenant assigned to that querier. +// If shuffle-sharding is disabled, requests are dequeued in a fair round-robin fashion across all tenants. +// +// - Querier-Worker Queue Priority +// Querier-worker connections are distributed across queue partitions which separate query requests +// based on the query component expected to be utilized to service the query. +// This division prevents a query component experiencing high latency from dominating the utilization +// of querier-worker connections and preventing requests for other query components from being serviced. // -// If no specific behavior is required by TenantQuerierAssignments and QueryComponentUtilization, -// such as when shuffle-sharding is disabled or query component utilization is not a concern, -// requests are dequeued in a fair round-robin fashion across all tenants and query components. +// See each QueueAlgorithm implementation for more details. type RequestQueue struct { services.Service log log.Logger @@ -143,12 +146,12 @@ type RequestQueue struct { stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request. stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped. - requestsToEnqueue chan requestToEnqueue - requestsSent chan *SchedulerRequest - requestsCompleted chan *SchedulerRequest - querierWorkerOperations chan *querierWorkerOperation - waitingQuerierConns chan *waitingQuerierConn - waitingQuerierConnsToDispatch *list.List + requestsToEnqueue chan requestToEnqueue + requestsSent chan *SchedulerRequest + requestsCompleted chan *SchedulerRequest + querierWorkerOperations chan *querierWorkerOperation + waitingDequeueRequests chan *QuerierWorkerDequeueRequest + waitingDequeueRequestsToDispatch *list.List // QueryComponentUtilization encapsulates tracking requests from the time they are forwarded to a querier // to the time are completed by the querier or failed due to cancel, timeout, or disconnect. @@ -228,7 +231,7 @@ func (qwo *querierWorkerOperation) AwaitQuerierWorkerConnUpdate() error { type requestToEnqueue struct { tenantID TenantID - req Request + req QueryRequest maxQueriers int successFn func() errChan chan error @@ -245,7 +248,7 @@ func NewRequestQueue( enqueueDuration prometheus.Histogram, querierInflightRequestsMetric *prometheus.SummaryVec, ) (*RequestQueue, error) { - queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsMetric) + queryComponentCapacity, err := NewQueryComponentUtilization(querierInflightRequestsMetric) if err != nil { return nil, err } @@ -266,12 +269,12 @@ func NewRequestQueue( stopRequested: make(chan struct{}), stopCompleted: make(chan struct{}), - requestsToEnqueue: make(chan requestToEnqueue), - requestsSent: make(chan *SchedulerRequest), - requestsCompleted: make(chan *SchedulerRequest), - querierWorkerOperations: make(chan *querierWorkerOperation), - waitingQuerierConns: make(chan *waitingQuerierConn), - waitingQuerierConnsToDispatch: list.New(), + requestsToEnqueue: make(chan requestToEnqueue), + requestsSent: make(chan *SchedulerRequest), + requestsCompleted: make(chan *SchedulerRequest), + querierWorkerOperations: make(chan *querierWorkerOperation), + waitingDequeueRequests: make(chan *QuerierWorkerDequeueRequest), + waitingDequeueRequestsToDispatch: list.New(), QueryComponentUtilization: queryComponentCapacity, queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, false, forgetDelay), @@ -334,23 +337,23 @@ func (q *RequestQueue) dispatcherLoop() { if err == nil { needToDispatchQueries = true } - case waitingConn := <-q.waitingQuerierConns: - requestSent := q.trySendNextRequestForQuerier(waitingConn) + case waitingDequeueReq := <-q.waitingDequeueRequests: + requestSent := q.trySendNextRequestForQuerier(waitingDequeueReq) if !requestSent { // No requests available for this querier; add it to the list to try later. - q.waitingQuerierConnsToDispatch.PushBack(waitingConn) + q.waitingDequeueRequestsToDispatch.PushBack(waitingDequeueReq) } } if needToDispatchQueries { - currentElement := q.waitingQuerierConnsToDispatch.Front() + currentElement := q.waitingDequeueRequestsToDispatch.Front() for currentElement != nil && !q.queueBroker.isEmpty() { - call := currentElement.Value.(*waitingQuerierConn) + call := currentElement.Value.(*QuerierWorkerDequeueRequest) nextElement := currentElement.Next() // We have to capture the next element before calling Remove(), as Remove() clears it. if q.trySendNextRequestForQuerier(call) { - q.waitingQuerierConnsToDispatch.Remove(currentElement) + q.waitingDequeueRequestsToDispatch.Remove(currentElement) } currentElement = nextElement @@ -361,11 +364,11 @@ func (q *RequestQueue) dispatcherLoop() { // the queue is empty or until we have no more connected querier workers. if stopping && (q.queueBroker.isEmpty() || q.connectedQuerierWorkers.Load() == 0) { // tell any waiting querier connections that nothing is coming - currentElement := q.waitingQuerierConnsToDispatch.Front() + currentElement := q.waitingDequeueRequestsToDispatch.Front() for currentElement != nil { - waitingConn := currentElement.Value.(*waitingQuerierConn) - waitingConn.sendError(ErrStopped) + waitingDequeueReq := currentElement.Value.(*QuerierWorkerDequeueRequest) + waitingDequeueReq.sendError(ErrStopped) currentElement = currentElement.Next() } @@ -408,35 +411,43 @@ func (q *RequestQueue) enqueueRequestInternal(r requestToEnqueue) error { // trySendNextRequestForQuerier attempts to dequeue and send a request for a waiting querier-worker connection. // -// Returns true if the waitingQuerierConn can be removed from the list of waiting connections, -// meaning a requestForQuerier was sent through the waitingQuerierConn's receiving channel +// Returns true if the QuerierWorkerDequeueRequest can be removed from the list of waiting dequeue requests, +// meaning a querierWorkerDequeueResponse was sent through the QuerierWorkerDequeueRequest's receiving channel // or the waiting querier-worker connection's context was canceled. // -// The requestForQuerier message can contain either: +// The querierWorkerDequeueResponse message can contain either: // a) a query request which was successfully dequeued for the querier, or // b) an ErrQuerierShuttingDown indicating the querier has been placed in a graceful shutdown state. -func (q *RequestQueue) trySendNextRequestForQuerier(waitingConn *waitingQuerierConn) (done bool) { - req, tenant, idx, err := q.queueBroker.dequeueRequestForQuerier(waitingConn.lastTenantIndex.last, waitingConn.querierID) +func (q *RequestQueue) trySendNextRequestForQuerier(dequeueReq *QuerierWorkerDequeueRequest) (done bool) { + // TODO: This is a temporary solution to set the current querier-worker for the QuerierWorkerQueuePriorityAlgo. + if itq, ok := q.queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok { + for _, algoState := range itq.algosByDepth { + if qwpAlgo, ok := algoState.(*QuerierWorkerQueuePriorityAlgo); ok { + qwpAlgo.SetCurrentQuerierWorker(dequeueReq.WorkerID) + } + } + } + + req, tenant, idx, err := q.queueBroker.dequeueRequestForQuerier(dequeueReq.lastTenantIndex.last, dequeueReq.QuerierID) if err != nil { - // If this querier has told us it's shutting down, terminate WaitForRequestForQuerier with an error now... - waitingConn.sendError(err) - // ...and remove the waiting WaitForRequestForQuerier waitingConn from our list. + // If this querier has told us it's shutting down, terminate AwaitRequestForQuerier with an error now... + dequeueReq.sendError(err) + // ...and remove the waiting dequeueReq from our list. return true } - waitingConn.lastTenantIndex.last = idx if req == nil { // Nothing available for this querier, try again next time. return false } - reqForQuerier := requestForQuerier{ - req: req.req, - lastTenantIndex: waitingConn.lastTenantIndex, + reqForQuerier := querierWorkerDequeueResponse{ + queryRequest: req.req, + lastTenantIndex: TenantIndex{last: idx}, err: nil, } - requestSent := waitingConn.send(reqForQuerier) + requestSent := dequeueReq.sendResponse(reqForQuerier) if requestSent { q.queueLength.WithLabelValues(string(tenant.tenantID)).Dec() } else { @@ -445,7 +456,7 @@ func (q *RequestQueue) trySendNextRequestForQuerier(waitingConn *waitingQuerierC if err != nil { level.Error(q.log).Log( "msg", "failed to re-enqueue query request after dequeue", - "err", err, "tenant", tenant.tenantID, "querier", waitingConn.querierID, + "err", err, "tenant", tenant.tenantID, "querier", dequeueReq.QuerierID, ) } } @@ -460,7 +471,7 @@ func (q *RequestQueue) trySendNextRequestForQuerier(waitingConn *waitingQuerierC // // maxQueriers is tenant-specific value to compute which queriers should handle requests for this tenant. // It is passed to SubmitRequestToEnqueue because the value can change between calls. -func (q *RequestQueue) SubmitRequestToEnqueue(tenantID string, req Request, maxQueriers int, successFn func()) error { +func (q *RequestQueue) SubmitRequestToEnqueue(tenantID string, req QueryRequest, maxQueriers int, successFn func()) error { start := time.Now() defer func() { q.enqueueDuration.Observe(time.Since(start).Seconds()) @@ -482,39 +493,32 @@ func (q *RequestQueue) SubmitRequestToEnqueue(tenantID string, req Request, maxQ } } -// WaitForRequestForQuerier is called by a querier-worker to submit a waitingQuerierConn message to the RequestQueue. +// AwaitRequestForQuerier is called by a querier-worker to submit a QuerierWorkerDequeueRequest message to the RequestQueue. // -// This method blocks until the waitingQuerierConn gets a requestForQuerier message on its receiving channel, +// This method blocks until the QuerierWorkerDequeueRequest gets a querierWorkerDequeueResponse message on its receiving channel, // the querier-worker connection context is canceled, or the RequestQueue service stops. // -// Querier-workers should pass the last TenantIndex received from their previous call to WaitForRequestForQuerier, +// Querier-workers should pass the last TenantIndex received from their previous call to AwaitRequestForQuerier, // which enables the RequestQueue to iterate fairly across all tenants assigned to a querier. // If a querier-worker finds that the query request received for the tenant is already expired, // it can get another request for the same tenant by using TenantIndex.ReuseLastTenant. // Newly-connected querier-workers should pass FirstTenant as the TenantIndex to start iteration from the beginning. -func (q *RequestQueue) WaitForRequestForQuerier(ctx context.Context, last TenantIndex, querierID string) (Request, TenantIndex, error) { - waitingConn := &waitingQuerierConn{ - querierConnCtx: ctx, - querierID: QuerierID(querierID), - lastTenantIndex: last, - recvChan: make(chan requestForQuerier), - } - +func (q *RequestQueue) AwaitRequestForQuerier(dequeueReq *QuerierWorkerDequeueRequest) (QueryRequest, TenantIndex, error) { // context done cases serves as a default case to bail out // if the waiting querier-worker connection's context times out or is canceled, // allowing the dispatcherLoop to proceed with its next iteration select { - case q.waitingQuerierConns <- waitingConn: + case q.waitingDequeueRequests <- dequeueReq: select { - case reqForQuerier := <-waitingConn.recvChan: - return reqForQuerier.req, reqForQuerier.lastTenantIndex, reqForQuerier.err - case <-ctx.Done(): - return nil, last, ctx.Err() + case reqForQuerier := <-dequeueReq.recvChan: + return reqForQuerier.queryRequest, reqForQuerier.lastTenantIndex, reqForQuerier.err + case <-dequeueReq.ctx.Done(): + return nil, dequeueReq.lastTenantIndex, dequeueReq.ctx.Err() } - case <-ctx.Done(): - return nil, last, ctx.Err() + case <-dequeueReq.ctx.Done(): + return nil, dequeueReq.lastTenantIndex, dequeueReq.ctx.Err() case <-q.stopCompleted: - return nil, last, ErrStopped + return nil, dequeueReq.lastTenantIndex, ErrStopped } } @@ -598,7 +602,7 @@ func (q *RequestQueue) processQuerierWorkerOperation(querierWorkerOp *querierWor resharded = q.processUnregisterQuerierWorkerConn(querierWorkerOp.conn) case notifyShutdown: // No cleanup needed here in response to a graceful shutdown; just set querier state to shutting down. - // All subsequent waitingQuerierConns for the querier will receive an ErrQuerierShuttingDown. + // All subsequent waitingDequeueRequests for the querier will receive an ErrQuerierShuttingDown. // The querier-worker's end of the QuerierLoop will exit once it has received enough errors, // and the Querier connection counts will be decremented as the workers disconnect. resharded = q.queueBroker.notifyQuerierShutdown(querierWorkerOp.conn.QuerierID) @@ -637,7 +641,7 @@ func (q *RequestQueue) processForgetDisconnectedQueriers() (resharded bool) { } // TenantIndex is opaque type that allows to resume iteration over tenants -// between successive calls of RequestQueue.WaitForRequestForQuerier method. +// between successive calls of RequestQueue.AwaitRequestForQuerier method. type TenantIndex struct { last int } @@ -655,38 +659,45 @@ func FirstTenant() TenantIndex { return TenantIndex{last: -1} } -// waitingQuerierConn is a "request" indicating that the querier is ready to receive the next query request. -// It embeds the unbuffered `recvChan` to receive the requestForQuerier "response" from the RequestQueue. -// The request/response terminology is avoided in naming to disambiguate with the actual query requests. -type waitingQuerierConn struct { - querierConnCtx context.Context - querierID QuerierID +// QuerierWorkerDequeueRequest is a request from a querier-worker which is ready to receive the next query. +// It embeds the unbuffered `recvChan` to receive the querierWorkerDequeueResponse from the RequestQueue. +type QuerierWorkerDequeueRequest struct { + *QuerierWorkerConn lastTenantIndex TenantIndex - recvChan chan requestForQuerier + recvChan chan querierWorkerDequeueResponse +} + +func NewQuerierWorkerDequeueRequest(querierWorkerConn *QuerierWorkerConn, lastTenantIdx TenantIndex) *QuerierWorkerDequeueRequest { + return &QuerierWorkerDequeueRequest{ + QuerierWorkerConn: querierWorkerConn, + lastTenantIndex: lastTenantIdx, + recvChan: make(chan querierWorkerDequeueResponse), + } } -// requestForQuerier is the "response" for a waitingQuerierConn, to be written to its receiving channel. +// querierWorkerDequeueResponse is the response for a QuerierWorkerDequeueRequest, +// to be written to the dequeue requests 's receiver channel. // Errors are embedded in this response rather than written to a separate error channel // so that lastTenantIndex can still be returned back to the querier connection. -type requestForQuerier struct { - req Request +type querierWorkerDequeueResponse struct { + queryRequest QueryRequest lastTenantIndex TenantIndex err error } -func (wqc *waitingQuerierConn) sendError(err error) { - // querier or request queue may be shutting down; ignore the result from send +func (wqc *QuerierWorkerDequeueRequest) sendError(err error) { + // querier or request queue may be shutting down; ignore the result from sendResponse // as the querier may not receive the message before the context is canceled - _ = wqc.send(requestForQuerier{err: err}) + _ = wqc.sendResponse(querierWorkerDequeueResponse{err: err}) } -// send sends req to the waitingQuerierConn result channel that is waiting for a new query. -// Returns true if sending succeeds, or false if req context is timed out or canceled. -func (wqc *waitingQuerierConn) send(req requestForQuerier) bool { +// sendResponse sends queryRequest to the receiver channel that is waiting for a new query. +// Returns true if sending succeeds, or false if queryRequest context is timed out or canceled. +func (wqc *QuerierWorkerDequeueRequest) sendResponse(req querierWorkerDequeueResponse) bool { select { case wqc.recvChan <- req: return true - case <-wqc.querierConnCtx.Done(): + case <-wqc.ctx.Done(): // context done case serves as a default case to bail out // if the waiting querier-worker connection's context times out or is canceled, // allowing the dispatcherLoop to proceed with its next iteration diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index a573ea6406e..c8302e71947 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -9,7 +9,6 @@ import ( "fmt" "math/rand" "strconv" - "strings" "sync" "testing" "time" @@ -20,7 +19,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -93,156 +91,6 @@ func makeSchedulerRequest(tenantID string, additionalQueueDimensions []string) * } } -// TestMultiDimensionalQueueFairnessSlowConsumerEffects emulates a simplified queue slowdown scenario -// which the scheduler's additional queue dimensions features are intended to solve for. -// -// In this scenario, one category of queue item causes the queue consumer to slow down, introducing a -// significant delay while the queue consumer processes it and before the consumer can dequeue the next item. -// This simulates a situation where one of the query components - the ingesters or store-gateways - is under load. -// -// If queue items belonging to the slow category are in the same queue ("normal-channel") in front of the normal queue -// items, the normal queue items must wait for all slow queue items to be cleared before they can be serviced. -// In this way, the degraded performance of the slow query component equally degrades the performance of the -// queries which *could* be serviced quickly, but are waiting behind the slow queries in the queue. -// -// When using multiple queue dimensions, the queues are split by which "component" the query will utilize -- in this -// test, those components are called "normal-channel" and "slow-channel" for clarity. The queue broker then -// round-robins between the multiple queues, which has the effect of alternately dequeuing from the slow queries -// and normal queries rather than blocking normal queries behind slow queries. -func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) { - for _, tt := range buildTreeTestsStruct() { - // Only test allowed combinations of these configs - t.Run(tt.name, func(t *testing.T) { - promRegistry := prometheus.NewPedanticRegistry() - - maxQueriersPerTenant := 0 // disable shuffle sharding - forgetQuerierDelay := time.Duration(0) - maxOutstandingRequestsPerTenant := 1000 - - totalRequests := 100 - numTenants := 1 - numProducers := 10 - numConsumers := 1 - - normalQueueDimension := "normal-request" - slowConsumerLatency := 20 * time.Millisecond - slowConsumerQueueDimension := "slow-request" - normalQueueDimensionFunc := func(_ bool) []string { return []string{"normal-channel"} } - slowQueueDimensionFunc := func(usingMultipleDimensions bool) []string { - if usingMultipleDimensions { - return []string{"slow-channel"} - } - return []string{"normal-channel"} - } - - useMultipleDimensions := []bool{false, true} - queueDurationTotals := map[bool]map[string]float64{ - false: {normalQueueDimension: 0.0, slowConsumerQueueDimension: 0.0}, - true: {normalQueueDimension: 0.0, slowConsumerQueueDimension: 0.0}, - } - - for _, multipleDimensionsUsed := range useMultipleDimensions { - - // Scheduler code uses a histogram for queue duration, but a counter is a more direct metric - // for this test, as we are concerned with the total or average wait time for all queue items. - // Prometheus histograms also lack support for test assertions via prometheus/testutil. - queueDuration := promauto.With(promRegistry).NewCounterVec(prometheus.CounterOpts{ - Name: "test_query_scheduler_queue_duration_total_seconds", - Help: "[test] total time spent by items in queue before getting picked up by a consumer", - }, []string{"additional_queue_dimensions"}) - - queue, err := NewRequestQueue( - log.NewNopLogger(), - maxOutstandingRequestsPerTenant, - tt.useMultiAlgoTreeQueue, - tt.prioritizeQueryComponents, - forgetQuerierDelay, - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), - ) - require.NoError(t, err) - - ctx := context.Background() - require.NoError(t, queue.starting(ctx)) - t.Cleanup(func() { - require.NoError(t, queue.stop(nil)) - }) - - // fill queue first with the slow queries, then the normal queries - for _, queueDimensionFunc := range []func(bool) []string{slowQueueDimensionFunc, normalQueueDimensionFunc} { - startProducersChan := make(chan struct{}) - producersErrGroup, _ := errgroup.WithContext(ctx) - - runProducer := runQueueProducerIters( - queue, maxQueriersPerTenant, totalRequests/2, numProducers, numTenants, startProducersChan, multipleDimensionsUsed, queueDimensionFunc, - ) - for producerIdx := 0; producerIdx < numProducers; producerIdx++ { - producerIdx := producerIdx - producersErrGroup.Go(func() error { - return runProducer(producerIdx) - }) - } - close(startProducersChan) - err := producersErrGroup.Wait() - require.NoError(t, err) - } - - // emulate delay when consuming the slow queries - consumeFunc := func(request Request) error { - schedulerRequest := request.(*SchedulerRequest) - if schedulerRequest.AdditionalQueueDimensions[0] == slowConsumerQueueDimension { - time.Sleep(slowConsumerLatency) - } - - queueTime := time.Since(schedulerRequest.EnqueueTime) - additionalQueueDimensionLabels := strings.Join(schedulerRequest.AdditionalQueueDimensions, ":") - queueDuration.With(prometheus.Labels{"additional_queue_dimensions": additionalQueueDimensionLabels}).Add(queueTime.Seconds()) - return nil - } - - // consume queries - queueConsumerErrGroup, ctx := errgroup.WithContext(ctx) - startConsumersChan := make(chan struct{}) - runConsumer := runQueueConsumerIters(ctx, queue, totalRequests, numConsumers, startConsumersChan, consumeFunc) - - for consumerIdx := 0; consumerIdx < numConsumers; consumerIdx++ { - consumerIdx := consumerIdx - queueConsumerErrGroup.Go(func() error { - return runConsumer(consumerIdx) - }) - } - - close(startConsumersChan) - err = queueConsumerErrGroup.Wait() - require.NoError(t, err) - - // record total queue duration by queue dimensions and whether the queue splitting was enabled - for _, queueDimension := range []string{normalQueueDimension, slowConsumerQueueDimension} { - queueDurationTotals[multipleDimensionsUsed][queueDimension] = promtest.ToFloat64( - queueDuration.With(prometheus.Labels{"additional_queue_dimensions": queueDimension}), - ) - } - - promRegistry.Unregister(queueDuration) - } - - // total or average time in queue for a normal queue item should be roughly cut in half - // when queue splitting is enabled, as the average normal queue item waits behind - // half of the slow queue items, instead of waiting behind all the slow queue items. - expected := queueDurationTotals[false][normalQueueDimension] / 2 - actual := queueDurationTotals[true][normalQueueDimension] - // some variance allowed due to actual time processing needed beyond the slow consumer delay; - // variance is also a function of the number of consumers and the consumer delay chosen. - // variance can be tighter if the test runs longer but there is a tradeoff for testing and CI speed - delta := expected * 0.10 - require.InDelta(t, expected, actual, delta) - }) - } - -} - func BenchmarkConcurrentQueueOperations(b *testing.B) { treeTypes := buildTreeTestsStruct() @@ -276,40 +124,30 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) { ) require.NoError(b, err) - startSignalChan := make(chan struct{}) - queueActorsErrGroup, ctx := errgroup.WithContext(context.Background()) + ctx := context.Background() require.NoError(b, queue.starting(ctx)) b.Cleanup(func() { require.NoError(b, queue.stop(nil)) }) - runProducer := runQueueProducerIters( - queue, maxQueriersPerTenant, b.N, numProducers, numTenants, startSignalChan, true, nil, + startProducersChan, producersErrGroup := makeQueueProducerGroup( + queue, maxQueriersPerTenant, b.N, numProducers, numTenants, nil, ) - for producerIdx := 0; producerIdx < numProducers; producerIdx++ { - producerIdx := producerIdx - queueActorsErrGroup.Go(func() error { - return runProducer(producerIdx) - }) - } + queueConsumerErrGroup, startConsumersChan := makeQueueConsumerGroup( + context.Background(), queue, b.N, numConsumers, 1, nil, + ) - runConsumer := runQueueConsumerIters(ctx, queue, b.N, numConsumers, startSignalChan, nil) + b.ResetTimer() + close(startProducersChan) // run producers + close(startConsumersChan) // run consumers - for consumerIdx := 0; consumerIdx < numConsumers; consumerIdx++ { - consumerIdx := consumerIdx - queueActorsErrGroup.Go(func() error { - return runConsumer(consumerIdx) - }) - } + err = producersErrGroup.Wait() + require.NoError(b, err) - b.ResetTimer() - close(startSignalChan) - err = queueActorsErrGroup.Wait() - if err != nil { - require.NoError(b, err) - } + err = queueConsumerErrGroup.Wait() + require.NoError(b, err) }) } }) @@ -339,6 +177,35 @@ func queueActorIterationCount(totalIters int, numActors int, actorIdx int) int { return actorIters } +func makeQueueProducerGroup( + queue *RequestQueue, + maxQueriersPerTenant int, + totalRequests int, + numProducers int, + numTenants int, + queueDimensionFunc func(string) []string, +) (chan struct{}, *errgroup.Group) { + startProducersChan := make(chan struct{}) + producersErrGroup, _ := errgroup.WithContext(context.Background()) + + runProducer := runQueueProducerIters( + queue, + maxQueriersPerTenant, + totalRequests, + numProducers, + numTenants, + startProducersChan, + queueDimensionFunc, + ) + for producerIdx := 0; producerIdx < numProducers; producerIdx++ { + producerIdx := producerIdx + producersErrGroup.Go(func() error { + return runProducer(producerIdx) + }) + } + return startProducersChan, producersErrGroup +} + func runQueueProducerIters( queue *RequestQueue, maxQueriersPerTenant int, @@ -346,8 +213,7 @@ func runQueueProducerIters( numProducers int, numTenants int, start chan struct{}, - usingMultipleDimensions bool, - additionalQueueDimensionFunc func(bool) []string, + additionalQueueDimensionFunc func(tenantID string) []string, ) func(producerIdx int) error { return func(producerIdx int) error { producerIters := queueActorIterationCount(totalIters, numProducers, producerIdx) @@ -356,7 +222,7 @@ func runQueueProducerIters( <-start for i := 0; i < producerIters; i++ { - err := queueProduce(queue, maxQueriersPerTenant, tenantIDStr, usingMultipleDimensions, additionalQueueDimensionFunc) + err := queueProduce(queue, maxQueriersPerTenant, tenantIDStr, additionalQueueDimensionFunc) if err != nil { return err } @@ -372,12 +238,11 @@ func queueProduce( queue *RequestQueue, maxQueriersPerTenant int, tenantID string, - usingMultipleDimensions bool, - additionalQueueDimensionFunc func(bool) []string, + additionalQueueDimensionFunc func(tenantID string) []string, ) error { var additionalQueueDimensions []string if additionalQueueDimensionFunc != nil { - additionalQueueDimensions = additionalQueueDimensionFunc(usingMultipleDimensions) + additionalQueueDimensions = additionalQueueDimensionFunc(tenantID) } req := makeSchedulerRequest(tenantID, additionalQueueDimensions) for { @@ -393,55 +258,98 @@ func queueProduce( return nil } -func runQueueConsumerIters( +func makeQueueConsumerGroup( ctx context.Context, queue *RequestQueue, - totalIters int, + totalRequests int, numConsumers int, - start chan struct{}, + numWorkersPerConsumer int, consumeFunc consumeRequest, +) (*errgroup.Group, chan struct{}) { + queueConsumerErrGroup, ctx := errgroup.WithContext(ctx) + consumedRequestsCounter := make(chan struct{}, totalRequests) + startConsumersChan := make(chan struct{}) + stopConsumersChan := make(chan struct{}) + runConsumer := runQueueConsumerUntilEmpty(ctx, totalRequests, queue, consumeFunc, consumedRequestsCounter, startConsumersChan, stopConsumersChan) + + for consumerIdx := 0; consumerIdx < numConsumers; consumerIdx++ { + for workerIdx := 0; workerIdx < numWorkersPerConsumer; workerIdx++ { + consumerIdx := consumerIdx + queueConsumerErrGroup.Go(func() error { + return runConsumer(consumerIdx) + }) + } + } + return queueConsumerErrGroup, startConsumersChan +} + +func runQueueConsumerUntilEmpty( + ctx context.Context, + totalRequests int, + requestQueue *RequestQueue, + consumeFunc consumeRequest, + consumedRequestsCounter chan struct{}, + start chan struct{}, + stop chan struct{}, ) func(consumerIdx int) error { return func(consumerIdx int) error { - consumerIters := queueActorIterationCount(totalIters, numConsumers, consumerIdx) lastTenantIndex := FirstTenant() querierID := fmt.Sprintf("consumer-%v", consumerIdx) + querierWorkerConn := NewUnregisteredQuerierWorkerConn(context.Background(), QuerierID(querierID)) - err := queue.AwaitRegisterQuerierWorkerConn(querierWorkerConn) + err := requestQueue.AwaitRegisterQuerierWorkerConn(querierWorkerConn) if err != nil { return err } - defer queue.SubmitUnregisterQuerierWorkerConn(querierWorkerConn) + defer requestQueue.SubmitUnregisterQuerierWorkerConn(querierWorkerConn) + + consumedRequest := make(chan struct{}) + loopQueueConsume := func() error { + for { + idx, err := queueConsume(requestQueue, querierWorkerConn, lastTenantIndex, consumeFunc) + if err != nil { + return err + } + + consumedRequest <- struct{}{} + lastTenantIndex = idx + } + } + loopQueueConsumeErrGroup, _ := errgroup.WithContext(ctx) <-start + loopQueueConsumeErrGroup.Go(loopQueueConsume) - for i := 0; i < consumerIters; i++ { - idx, err := queueConsume(ctx, queue, querierID, lastTenantIndex, consumeFunc) - if err != nil { - return err + for { + select { + case <-stop: + return nil + case <-consumedRequest: + consumedRequestsCounter <- struct{}{} + if len(consumedRequestsCounter) == totalRequests { + close(stop) + } } - - lastTenantIndex = idx } - - return nil } } -type consumeRequest func(request Request) error +type consumeRequest func(request QueryRequest) error func queueConsume( - ctx context.Context, queue *RequestQueue, querierID string, lastTenantIndex TenantIndex, consumeFunc consumeRequest, + queue *RequestQueue, querierWorkerConn *QuerierWorkerConn, lastTenantIdx TenantIndex, consumeFunc consumeRequest, ) (TenantIndex, error) { - request, idx, err := queue.WaitForRequestForQuerier(ctx, lastTenantIndex, querierID) + dequeueReq := NewQuerierWorkerDequeueRequest(querierWorkerConn, lastTenantIdx) + request, idx, err := queue.AwaitRequestForQuerier(dequeueReq) if err != nil { - return lastTenantIndex, err + return lastTenantIdx, err } - lastTenantIndex = idx + lastTenantIdx = idx if consumeFunc != nil { err = consumeFunc(request) } - return lastTenantIndex, err + return lastTenantIdx, err } func TestRequestQueue_RegisterAndUnregisterQuerierWorkerConnections(t *testing.T) { @@ -580,7 +488,8 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe querier2wg.Add(1) go func() { defer querier2wg.Done() - _, _, err := queue.WaitForRequestForQuerier(ctx, FirstTenant(), "querier-2") + dequeueReq := NewQuerierWorkerDequeueRequest(querier2Conn, FirstTenant()) + _, _, err := queue.AwaitRequestForQuerier(dequeueReq) require.NoError(t, err) }() @@ -675,7 +584,8 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip querier2wg.Add(1) go func() { defer querier2wg.Done() - _, _, err := queue.WaitForRequestForQuerier(ctx, FirstTenant(), "querier-2") + dequeueReq := NewQuerierWorkerDequeueRequest(querier2Conn, FirstTenant()) + _, _, err := queue.AwaitRequestForQuerier(dequeueReq) require.NoError(t, err) }() @@ -741,24 +651,29 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled querier1Conn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn)) - // Calling WaitForRequestForQuerier with a context that is already cancelled should fail immediately. + // Calling AwaitRequestForQuerier with a context that is already cancelled should fail immediately. deadCtx, cancel := context.WithCancel(context.Background()) cancel() - r, tenant, err := queue.WaitForRequestForQuerier(deadCtx, FirstTenant(), querierID) + querier1Conn.ctx = deadCtx + + dequeueReq := NewQuerierWorkerDequeueRequest(querier1Conn, FirstTenant()) + r, tenant, err := queue.AwaitRequestForQuerier(dequeueReq) assert.Nil(t, r) assert.Equal(t, FirstTenant(), tenant) assert.ErrorIs(t, err, context.Canceled) - // Further, a context canceled after WaitForRequestForQuerier publishes a request should also fail. + // Further, a context canceled after AwaitRequestForQuerier publishes a request should also fail. errChan := make(chan error) ctx, cancel := context.WithCancel(context.Background()) + querier1Conn.ctx = ctx go func() { - _, _, err := queue.WaitForRequestForQuerier(ctx, FirstTenant(), querierID) + dequeueReq := NewQuerierWorkerDequeueRequest(querier1Conn, FirstTenant()) + _, _, err := queue.AwaitRequestForQuerier(dequeueReq) errChan <- err }() - time.Sleep(20 * time.Millisecond) // Wait for WaitForRequestForQuerier to be waiting for a query. + time.Sleep(20 * time.Millisecond) // Wait for AwaitRequestForQuerier to be waiting for a query. cancel() select { @@ -805,7 +720,8 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI queue.SubmitNotifyQuerierShutdown(ctx, querierID) - _, _, err = queue.WaitForRequestForQuerier(context.Background(), FirstTenant(), querierID) + dequeueReq := NewQuerierWorkerDequeueRequest(querierConn, FirstTenant()) + _, _, err = queue.AwaitRequestForQuerier(dequeueReq) require.EqualError(t, err, "querier has informed the scheduler it is shutting down") }) } @@ -833,7 +749,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend require.NoError(t, err) // bypassing queue dispatcher loop for direct usage of the queueBroker and - // passing a waitingQuerierConn for a canceled querier connection + // passing a QuerierWorkerDequeueRequest for a canceled querier connection queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, queue.forgetDelay) queueBroker.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), querierID)) @@ -872,16 +788,19 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend } ctx, cancel := context.WithCancel(context.Background()) - call := &waitingQuerierConn{ - querierConnCtx: ctx, - querierID: QuerierID(querierID), + call := &QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{ + ctx: ctx, + QuerierID: QuerierID(querierID), + WorkerID: 0, + }, lastTenantIndex: FirstTenant(), - recvChan: make(chan requestForQuerier), + recvChan: make(chan querierWorkerDequeueResponse), } cancel() // ensure querier context done before send is attempted // send to querier will fail but method returns true, - // indicating not to re-submit a request for waitingQuerierConn for the querier + // indicating not to re-submit a request for QuerierWorkerDequeueRequest for the querier require.True(t, queue.trySendNextRequestForQuerier(call)) // assert request was re-enqueued for tenant after failed send // TODO (casie): Clean this up when deprecating legacy tree queue diff --git a/pkg/scheduler/queue/tenant_queues.go b/pkg/scheduler/queue/tenant_queues.go index 4994d10b7d1..ac983328722 100644 --- a/pkg/scheduler/queue/tenant_queues.go +++ b/pkg/scheduler/queue/tenant_queues.go @@ -19,7 +19,7 @@ type QuerierID string type tenantRequest struct { tenantID TenantID - req Request + req QueryRequest } // queueBroker encapsulates access to tenant queues for pending requests diff --git a/pkg/scheduler/queue/tree_queue_algo_querier_worker_queue_priority.go b/pkg/scheduler/queue/tree_queue_algo_querier_worker_queue_priority.go new file mode 100644 index 00000000000..0afd10e8150 --- /dev/null +++ b/pkg/scheduler/queue/tree_queue_algo_querier_worker_queue_priority.go @@ -0,0 +1,161 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package queue + +import "slices" + +// QuerierWorkerQueuePriorityAlgo implements QueuingAlgorithm by mapping worker IDs to a queue node to prioritize. +// Querier-workers' prioritized queue nodes are calculated by the integer workerID % len(nodeOrder). +// This distribution of workers across query component subtrees ensures that when one query component is experiencing +// high latency about 25% of querier-workers continue prioritizing queries for unaffected components. +// +// This significantly outperforms the previous round-robin approach which simply rotated through the node order. +// Although a vanilla round-robin algorithm will select a given query-component node 1 / 4 of the time, +// in situations of high latency on a query component, the utilization of the querier-worker connections +// as measured by inflight query processing time will grow asymptotically to be dominated by the slow query component. +// +// There are 4 possible query components: "ingester", "store-gateway", "ingester-and-store-gateway", and "unknown". +// When all 4 queue nodes exist, approximately 1 / 4 of the querier-workers are prioritized to each queue node. +// This algorithm requires a minimum of 4 querier-workers per querier to prevent queue starvation. +// The minimum is enforced in the queriers by overriding -querier.max-concurrent if necessary. +// +// MultiQueuingAlgorithmTreeQueue always deletes empty leaf nodes and nodes with no children after a dequeue operation, +// and only recreates the queue nodes when a new query request is enqueued which requires that path through the tree. +// QuerierWorkerQueuePriorityAlgo responds by removing or re-adding the query component nodes to the nodeOrder. +// This has two implications for the distribution of workers across queue nodes: +// 1. The modulo operation may modulo the worker ID by 1, 2, 3, or 4 depending on the number of node types +// currently present in the nodeOrder, which can change which node a worker ID is prioritized for. +// 2. The nodeOrder changes as queues are deleted and re-created, so the worker ID-to-node mapping changes +// as the random enqueue order places query component nodes in different positions in the order. +// +// These changes in nodeOrder guarantee that when the number of querier-workers is not evenly divisible +// by the number of query component nodes, through the randomized changes in nodeOrder over time, the workers +// are more evenly distributed across query component nodes than if length and order of the nodes were fixed. +// +// A given worker ID is prioritized to *start* at a given queue node, but is not assigned strictly to that node. +// During any period without change to the nodeOrder, the same worker ID consistently starts at the same queue node, +// but moves on to other nodes if it cannot dequeue a request from the subtree of its first prioritized queue node. +// Continuing to search through other query-component nodes and their subtrees minimizes idle querier-worker capacity. +// +// A querier-worker can process queries for nodes it has not prioritized when this QueuingAlgorithm is applied at the +// highest layer of the tree and the tenant-querier-shuffle-shard QueuingAlgorithm applied at the second layer of the +// tree. If shuffle-sharding is enabled, a querier-worker that prioritizes ingester-only queries may not find +// ingester-only queries for any tenant it is assigned to, and move on to the next query component subtree. E.g.: +// +// 1. This algorithm has nodeOrder: ["ingester", "store-gateway", "ingester-and-store-gateway", "unknown"]. +// +// 2. A querier-worker with workerID 0 requests to dequeue; it prioritizes the "ingester" queue node. +// +// 3. The dequeue operation attempts to dequeue first from the child nodes of the "ingester" node, +// where each child node is a tenant-specific queue of ingester-only queries. The tenantQuerierAssignments +// QueuingAlgorithm checks if any of its tenant queue nodes is sharded to this querier, and finds none. +// +// 4. The dequeue operation walks back up to the QuerierWorkerQueuePriorityAlgo level, not having dequeued anything. +// The QuerierWorkerQueuePriorityAlgo moves on and selects the next query-component node in the nodeOrder, +// and recurs again to search that next subtree for tenant queue nodes sharded to this querier, from step 3, etc., +// until a dequeue-able tenant queue node is found, or every query component node subtree has been exhausted. +type QuerierWorkerQueuePriorityAlgo struct { + currentQuerierWorker int + currentNodeOrderIndex int + nodeOrder []string + nodeCounts map[string]int + nodesChecked int +} + +func NewQuerierWorkerQueuePriorityAlgo() *QuerierWorkerQueuePriorityAlgo { + return &QuerierWorkerQueuePriorityAlgo{ + nodeCounts: make(map[string]int), + } +} + +func (qa *QuerierWorkerQueuePriorityAlgo) SetCurrentQuerierWorker(workerID int) { + qa.currentQuerierWorker = workerID + if len(qa.nodeOrder) == 0 { + qa.currentNodeOrderIndex = 0 + } else { + qa.currentNodeOrderIndex = workerID % len(qa.nodeOrder) + } +} + +func (qa *QuerierWorkerQueuePriorityAlgo) wrapCurrentNodeOrderIndex(increment bool) { + if increment { + qa.currentNodeOrderIndex++ + } + + if qa.currentNodeOrderIndex >= len(qa.nodeOrder) { + qa.currentNodeOrderIndex = 0 + } +} + +func (qa *QuerierWorkerQueuePriorityAlgo) checkedAllNodes() bool { + return qa.nodesChecked == len(qa.nodeOrder) +} + +func (qa *QuerierWorkerQueuePriorityAlgo) addChildNode(parent, child *Node) { + // add child node to its parent's queueMap + parent.queueMap[child.Name()] = child + + // add child node to the global node order if it did not already exist + if qa.nodeCounts[child.Name()] == 0 { + if qa.currentNodeOrderIndex == 0 { + // special case; since we are at the beginning of the order, + // only a simple append is needed to add the new node to the end, + // which also creates a more intuitive initial order for tests + qa.nodeOrder = append(qa.nodeOrder, child.Name()) + } else { + // insert into the order behind current child queue index + // to prevent the possibility of new nodes continually jumping the line + qa.nodeOrder = slices.Insert(qa.nodeOrder, qa.currentNodeOrderIndex, child.Name()) + // since the new node was inserted into the order behind the current node, + // the currentNodeOrderIndex must be pushed forward to remain pointing at the same node + qa.wrapCurrentNodeOrderIndex(true) + } + } + + // add child node to global nodeCounts + qa.nodeCounts[child.Name()]++ +} + +func (qa *QuerierWorkerQueuePriorityAlgo) dequeueSelectNode(node *Node) (*Node, bool) { + currentNodeName := qa.nodeOrder[qa.currentNodeOrderIndex] + if node, ok := node.queueMap[currentNodeName]; ok { + qa.nodesChecked++ + return node, qa.checkedAllNodes() + } + return nil, qa.checkedAllNodes() +} + +func (qa *QuerierWorkerQueuePriorityAlgo) dequeueUpdateState(node *Node, dequeuedFrom *Node) { + // if the child node is nil, we haven't done anything to the tree; return early + if dequeuedFrom == nil { + return + } + + // if the child is empty, we should delete it + if dequeuedFrom != node && dequeuedFrom.IsEmpty() { + childName := dequeuedFrom.Name() + + // decrement the global nodeCounts + qa.nodeCounts[childName]-- + + // only delete from global nodeOrder if the global nodeCount is now zero + // meaning there are no nodes with this name remaining in the tree + if qa.nodeCounts[childName] == 0 { + childIndex := slices.Index(qa.nodeOrder, childName) + if childIndex != -1 { + qa.nodeOrder = slices.Delete(qa.nodeOrder, childIndex, childIndex+1) + // we do not need to increment currentNodeOrderIndex + // the node removed is always the node pointed to by currentNodeOrderIndex + // so removing it sets our currentNodeOrderIndex to the next node already + // we will wrap if needed, as currentNodeOrderIndex may be pointing past the end of the slice now + qa.wrapCurrentNodeOrderIndex(false) + } + } + + // delete child node from its parent's queueMap + delete(node.queueMap, childName) + } + + // reset state after successful dequeue + qa.nodesChecked = 0 +} diff --git a/pkg/scheduler/queue/tree_queue_algo_querier_worker_queue_priority_test.go b/pkg/scheduler/queue/tree_queue_algo_querier_worker_queue_priority_test.go new file mode 100644 index 00000000000..dc40462c416 --- /dev/null +++ b/pkg/scheduler/queue/tree_queue_algo_querier_worker_queue_priority_test.go @@ -0,0 +1,345 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Test for the expected behavior of the queue algorithm when there is only a single worker. +// Having a number of active workers less than the number of queue nodes is the worst-case scenario +// for the algorithm, potentially resulting in complete queue starvation for some nodes. +// Workers will keep coming back to their assigned queue as long as the queue is not empty, +// and any queues without assigned workers may wait indefinitely until other queue nodes are empty. +func TestQuerierWorkerQueuePriority_SingleWorkerBehavior(t *testing.T) { + type opType string + enqueue := opType("enqueue") + dequeue := opType("dequeue") + + type op struct { + kind opType + path QueuePath + obj any + } + + operationOrder := []op{ + // enqueue 2 objects each to 3 different children; + // nodes are only added to a rotation on first enqueue, + // so only order of first enqueue sets the dequeue order + {enqueue, QueuePath{"child-1"}, "obj-1"}, // child-1 node created + {enqueue, QueuePath{"child-2"}, "obj-2"}, // child-2 node created + {enqueue, QueuePath{"child-3"}, "obj-3"}, // child-3 node created + // order of nodes is set, further enqueues in a different order will not change it + {enqueue, QueuePath{"child-3"}, "obj-4"}, + {enqueue, QueuePath{"child-2"}, "obj-5"}, + {enqueue, QueuePath{"child-1"}, "obj-6"}, + + // without changing worker ID, dequeue will clear full queue nodes, + // starting with the first node in the order then moving on to the next empty queue + {dequeue, QueuePath{"child-1"}, "obj-1"}, + {dequeue, QueuePath{"child-1"}, "obj-6"}, + // child-1 is now empty and removed from rotation + {dequeue, QueuePath{"child-2"}, "obj-2"}, + {dequeue, QueuePath{"child-2"}, "obj-5"}, + // child-2 is now empty and removed from rotation + {dequeue, QueuePath{"child-3"}, "obj-3"}, + + // enqueue for child-1 again to verify it is added back to rotation + {enqueue, QueuePath{"child-1"}, "obj-7"}, + + // child-3 is still next; child-1 was added back to rotation + {dequeue, QueuePath{"child-3"}, "obj-4"}, + // child-3 is now empty and removed from rotation; only child-1 remains + {dequeue, QueuePath{"child-1"}, "obj-7"}, + // nothing left to dequeue + {dequeue, QueuePath{}, nil}, + } + + querierWorkerPrioritizationQueueAlgo := NewQuerierWorkerQueuePriorityAlgo() + + tree, err := NewTree(querierWorkerPrioritizationQueueAlgo, &roundRobinState{}) + require.NoError(t, err) + + for _, operation := range operationOrder { + if operation.kind == enqueue { + err = tree.EnqueueBackByPath(operation.path, operation.obj) + require.NoError(t, err) + } + if operation.kind == dequeue { + path, obj := tree.Dequeue() + require.Equal(t, operation.path, path) + require.Equal(t, operation.obj, obj) + } + } +} + +// Test for the expected behavior of the queue algorithm when there are multiple workers +// and when the querier-worker queue priority algorithm is used only at the highest layer of the tree. +// When at the highest layer of the tree, the global node count for any existing node is always 1, +// so the queue algorithm will always delete the node from the global node order rotation after it is emptied. +func TestQuerierWorkerQueuePriority_StartPositionByWorker(t *testing.T) { + querierWorkerPrioritizationQueueAlgo := NewQuerierWorkerQueuePriorityAlgo() + + tree, err := NewTree(querierWorkerPrioritizationQueueAlgo, &roundRobinState{}) + require.NoError(t, err) + + // enqueue 3 objects each to 3 different children; + require.NoError(t, tree.EnqueueBackByPath(QueuePath{ingesterQueueDimension}, "obj-1")) // ingester node created + require.NoError(t, tree.EnqueueBackByPath(QueuePath{storeGatewayQueueDimension}, "obj-2")) // store-gateway node created + require.NoError(t, tree.EnqueueBackByPath(QueuePath{ingesterAndStoreGatewayQueueDimension}, "obj-3")) // ingester-and-store-gateway node created + require.NoError(t, tree.EnqueueBackByPath(QueuePath{ingesterQueueDimension}, "obj-4")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{storeGatewayQueueDimension}, "obj-5")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{ingesterAndStoreGatewayQueueDimension}, "obj-6")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{ingesterQueueDimension}, "obj-7")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{storeGatewayQueueDimension}, "obj-8")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{ingesterAndStoreGatewayQueueDimension}, "obj-9")) + + // node order was set by initial enqueue order; + // order will remain until a node is deleted for being empty or a new node is added by an enqueue + expectedInitialNodeOrder := []string{ingesterQueueDimension, storeGatewayQueueDimension, ingesterAndStoreGatewayQueueDimension} + assert.Equal(t, expectedInitialNodeOrder, querierWorkerPrioritizationQueueAlgo.nodeOrder) + + // with 3 queues present, first node to be dequeued from is determined by worker ID % 3 + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(0) + path, obj := tree.Dequeue() + assert.Equal(t, QueuePath{ingesterQueueDimension}, path) + assert.Equal(t, "obj-1", obj) + + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(1) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{storeGatewayQueueDimension}, path) + assert.Equal(t, "obj-2", obj) + + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(2) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{ingesterAndStoreGatewayQueueDimension}, path) + assert.Equal(t, "obj-3", obj) + + // worker IDs can come in "out of order" to dequeue, and they will still start at the correct queue + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(5) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{ingesterAndStoreGatewayQueueDimension}, path) + assert.Equal(t, "obj-6", obj) + + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(3) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{ingesterQueueDimension}, path) + assert.Equal(t, "obj-4", obj) + + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(4) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{storeGatewayQueueDimension}, path) + assert.Equal(t, "obj-5", obj) + + // only 1 item left in each queue; as queue nodes are emptied and deleted, + // worker IDs will be remapped to different node types by the modulo operation + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(0) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{ingesterQueueDimension}, path) + assert.Equal(t, "obj-7", obj) + + // ingester queue empty and deleted: 2 queues left are ["store-gateway", "ingester-and-store-gateway"] + // with 2 queues present, first node to be dequeued from is determined by worker ID % 2 + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(1) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{ingesterAndStoreGatewayQueueDimension}, path) + assert.Equal(t, "obj-9", obj) + + // ingester-and-store-gateway queue empty and deleted: 1 queue left is just ["store-gateway"] + // every worker will dequeue from the same queue since there is only 1 left + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(999) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{storeGatewayQueueDimension}, path) + assert.Equal(t, "obj-8", obj) + + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{}, path) + // tree is now empty + assert.Nil(t, obj) +} + +// Test for the expected behavior of the queue algorithm when there are multiple workers +// and when the querier-worker queue priority algorithm is used at a layer of the tree below the highest layer. +// +// The node types managed by this queue algorithm can have many instances created within a tree layer, +// as multiple nodes in the parent tree layer can each have a child node of each type. +// +// The algorithm must only add the node type to the global node order rotation +// if the node is the first of its type created in the tree layer. +// The algorithm must only remove the node type from the global node order rotation +// once all instances of the node type are emptied and removed from the tree layer. +func TestQuerierWorkerQueuePriority_StartPositionByWorker_MultipleNodeCountsInTree(t *testing.T) { + querierWorkerPrioritizationQueueAlgo := NewQuerierWorkerQueuePriorityAlgo() + + tree, err := NewTree(&roundRobinState{}, querierWorkerPrioritizationQueueAlgo, &roundRobinState{}) + require.NoError(t, err) + + // enqueue 2 objects each to 2 different children, each with 3 different grandchildren; + // the highest layer is managed by a vanilla round-robin rotation + // and the second-highest layer is managed by the querier-worker queue priority algorithm. + // The global node order rotation for the querier-worker queue priority algorithm is only affected + // when the first node of a type is created in the tree layer or the last node of a type is emptied and removed. + // To keep things brief "i", "sg", and "isg" are used for "ingester", "store-gateway", and "ingester-and-store-gateway". + + // first creation of an ingester node anywhere adds ingester node to global node order + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"a", ingesterQueueDimension}, "obj-a-i-1")) + // same for store-gateway node + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"b", storeGatewayQueueDimension}, "obj-b-sg-1")) + // same for ingester-and-store-gateway node + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"b", ingesterAndStoreGatewayQueueDimension}, "obj-b-isg-1")) + // further node creation of those same types does not affect the global order + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"b", ingesterQueueDimension}, "obj-b-i-1")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"a", storeGatewayQueueDimension}, "obj-a-sg-1")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"a", ingesterAndStoreGatewayQueueDimension}, "obj-a-isg-1")) + + // fill in a second object for each of the 6 paths created above + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"a", ingesterQueueDimension}, "obj-a-i-2")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"a", storeGatewayQueueDimension}, "obj-a-sg-2")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"a", ingesterAndStoreGatewayQueueDimension}, "obj-a-isg-2")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"b", ingesterQueueDimension}, "obj-b-i-2")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"b", storeGatewayQueueDimension}, "obj-b-sg-2")) + require.NoError(t, tree.EnqueueBackByPath(QueuePath{"b", ingesterAndStoreGatewayQueueDimension}, "obj-b-isg-2")) + + /* balanced tree structure before dequeuing: + root + ├── a + │ ├── ingester + │ │ ├── obj-a-i-1 + │ │ ├── obj-a-i-2 + │ ├── store-gateway + │ │ ├── obj-a-sg-1 + │ │ ├── obj-a-sg-2 + │ └── ingester-and-store-gateway + │ ├── obj-a-isg-1 + │ ├── obj-a-isg-2 + ├── b + │ ├── ingester + │ │ ├── obj-b-i-1 + │ │ ├── obj-b-i-2 + │ ├── store-gateway + │ │ ├── obj-b-sg-1 + │ │ ├── obj-b-sg-2 + │ └── ingester-and-store-gateway + │ ├── obj-b-isg-1 + │ ├── obj-b-isg-2 + */ + + // node order was set by initial enqueue order; + assert.Equal(t, + []string{ingesterQueueDimension, storeGatewayQueueDimension, ingesterAndStoreGatewayQueueDimension}, + querierWorkerPrioritizationQueueAlgo.nodeOrder, + ) + + // show two-layer behavior (with top layer as vanilla round-robin) before any nodes are deleted + // with 3 queues present, first node to be dequeued from is determined by worker ID % 3 + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(0) + path, obj := tree.Dequeue() + assert.Equal(t, QueuePath{"a", ingesterQueueDimension}, path) + assert.Equal(t, "obj-a-i-1", obj) + + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(1) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"b", storeGatewayQueueDimension}, path) + assert.Equal(t, "obj-b-sg-1", obj) + + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(2) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"a", ingesterAndStoreGatewayQueueDimension}, path) + assert.Equal(t, "obj-a-isg-1", obj) + + // worker IDs can come in "out of order" to dequeue, and they will still start at the correct queue + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(5) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"b", ingesterAndStoreGatewayQueueDimension}, path) + assert.Equal(t, "obj-b-isg-1", obj) + + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(4) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"a", storeGatewayQueueDimension}, path) + assert.Equal(t, "obj-a-sg-1", obj) + + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(3) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"b", ingesterQueueDimension}, path) + assert.Equal(t, "obj-b-i-1", obj) + + // only 1 item left in each queue but there are still two queues of each type in the layer; + // as queue nodes are emptied and deleted, worker IDs will *only* be remapped to different node types + // when the last node of each type is deleted and the node type is removed from the global order. + + // dequeue with a worker ID mapped to the ingester node type + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(0) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"a", ingesterQueueDimension}, path) + assert.Equal(t, "obj-a-i-2", obj) + // node at path "a/ingester" is now empty and deleted but "ingester" is still in the global order + assert.Equal(t, + []string{ingesterQueueDimension, storeGatewayQueueDimension, ingesterAndStoreGatewayQueueDimension}, + querierWorkerPrioritizationQueueAlgo.nodeOrder, + ) + + // dequeue again with a worker ID mapped to the ingester node type + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(3) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"b", ingesterQueueDimension}, path) + assert.Equal(t, "obj-b-i-2", obj) + // the last node of the "ingester" type is empty and deleted, it is removed from the global order + assert.Equal(t, + []string{storeGatewayQueueDimension, ingesterAndStoreGatewayQueueDimension}, + querierWorkerPrioritizationQueueAlgo.nodeOrder, + ) + + // subsequent dequeues demonstrate that worker IDs are remapped to the remaining node types + + // dequeue with a worker ID mapped to the ingester-and-store-gateway node type + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(1) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"a", ingesterAndStoreGatewayQueueDimension}, path) + assert.Equal(t, "obj-a-isg-2", obj) + // node at path "a/ingester-and-store-gateway" is now empty and deleted but "ingester-and-store-gateway" is still in the global order + assert.Equal(t, + []string{storeGatewayQueueDimension, ingesterAndStoreGatewayQueueDimension}, + querierWorkerPrioritizationQueueAlgo.nodeOrder, + ) + + // dequeue with a worker ID mapped to the store-gateway node type + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(2) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"b", storeGatewayQueueDimension}, path) + assert.Equal(t, "obj-b-sg-2", obj) + // node at path "b/store-gateway" is now empty and deleted but "store-gateway" is still in the global order + assert.Equal(t, + []string{storeGatewayQueueDimension, ingesterAndStoreGatewayQueueDimension}, + querierWorkerPrioritizationQueueAlgo.nodeOrder, + ) + + // dequeue with a worker ID mapped to the ingester-and-store-gateway node type + querierWorkerPrioritizationQueueAlgo.SetCurrentQuerierWorker(1) + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"b", ingesterAndStoreGatewayQueueDimension}, path) + assert.Equal(t, "obj-b-isg-2", obj) + // the last node of the "ingester-and-store-gateway" type is empty and deleted, it is removed from the global order + assert.Equal(t, + []string{storeGatewayQueueDimension}, + querierWorkerPrioritizationQueueAlgo.nodeOrder, + ) + + // no need to re-assign the current querier-worker ID; there is only 1 node type left + // so any worker ID will dequeue from the same node type + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{"a", storeGatewayQueueDimension}, path) + assert.Equal(t, "obj-a-sg-2", obj) + // the last node of the "store-gateway" type is empty and deleted, it is removed from the global order + assert.Equal(t, + []string{}, + querierWorkerPrioritizationQueueAlgo.nodeOrder, + ) + + path, obj = tree.Dequeue() + assert.Equal(t, QueuePath{}, path) + // tree is now empty + assert.Nil(t, obj) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index c9773098fa2..e5c5cb51b2c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -440,18 +440,19 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL } defer s.requestQueue.SubmitUnregisterQuerierWorkerConn(querierWorkerConn) - lastUserIndex := queue.FirstTenant() + lastTenantIdx := queue.FirstTenant() // In stopping state scheduler is not accepting new queries, but still dispatching queries in the queues. for s.isRunningOrStopping() { - queueReq, idx, err := s.requestQueue.WaitForRequestForQuerier(querier.Context(), lastUserIndex, querierID) + dequeueReq := queue.NewQuerierWorkerDequeueRequest(querierWorkerConn, lastTenantIdx) + queryReq, idx, err := s.requestQueue.AwaitRequestForQuerier(dequeueReq) if err != nil { return s.transformRequestQueueError(err) } - lastUserIndex = idx + lastTenantIdx = idx - schedulerReq := queueReq.(*queue.SchedulerRequest) + schedulerReq := queryReq.(*queue.SchedulerRequest) queueTime := time.Since(schedulerReq.EnqueueTime) additionalQueueDimensionLabels := strings.Join(schedulerReq.AdditionalQueueDimensions, ":") @@ -474,7 +475,7 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL // remove from pending requests s.cancelRequestAndRemoveFromPending(schedulerReq.Key(), "request cancelled") s.requestQueue.QueryComponentUtilization.MarkRequestCompleted(schedulerReq) - lastUserIndex = lastUserIndex.ReuseLastTenant() + lastTenantIdx = lastTenantIdx.ReuseLastTenant() continue }