From 74d5fe8dace2bb79cba5d2242909e4a77b149d86 Mon Sep 17 00:00:00 2001 From: Sumit Anvekar <2293969+sumitanvekar@users.noreply.github.com> Date: Tue, 30 Nov 2021 20:05:38 +0530 Subject: [PATCH] Add support for concurrency ramp up (#202) * Add support for concurrency ramp up --- cmd/flags/root.go | 7 +++++++ cmd/root.go | 1 + docs/about/getting-started.md | 1 + internal/pkg/warmup/warmup.go | 10 ++++++++++ test/root_test.go | 1 + 5 files changed, 20 insertions(+) diff --git a/cmd/flags/root.go b/cmd/flags/root.go index 0dffebd..7054416 100644 --- a/cmd/flags/root.go +++ b/cmd/flags/root.go @@ -30,6 +30,7 @@ type Root struct { MaxDurationSeconds int Concurrency int RequestDelayMilliseconds int + ConcurrencyTargetSeconds int ExitAfterWarmup bool FailReadiness bool FileProbe @@ -48,6 +49,7 @@ func (r *Root) InitFlags() { flag.IntVar(&r.MaxDurationSeconds, "max-duration-seconds", 60, "Max duration in seconds after which warm up will stop making requests") flag.IntVar(&r.Concurrency, "concurrency", 2, "Number of concurrent requests for warm up") flag.IntVar(&r.RequestDelayMilliseconds, "request-delay-milliseconds", 500, "Delay in milliseconds between requests") + flag.IntVar(&r.ConcurrencyTargetSeconds, "concurrency-target-seconds", 0, "Time taken to reach expected concurrency. This is useful to ramp up traffic.") flag.BoolVar(&r.ExitAfterWarmup, "exit-after-warmup", false, "If warm up process should finish after completion. This is useful to prevent container restarts.") flag.BoolVar(&r.FailReadiness, "fail-readiness", false, "If set to true readiness will fail if no requests were sent.") @@ -63,6 +65,11 @@ func (r *Root) GetMaxDurationSeconds() int { return r.MaxDurationSeconds } +// GetConcurrencyTargetSeconds returns the value of the concurrency-target-seconds parameter. +func (r *Root) GetConcurrencyTargetSeconds() int { + return r.ConcurrencyTargetSeconds +} + // GetConcurrency returns the value of the concurrency parameter. func (r *Root) GetConcurrency() int { return r.Concurrency diff --git a/cmd/root.go b/cmd/root.go index a6efc00..4930dd6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -90,6 +90,7 @@ func run() int { GrpcRequests: grpcRequests, HttpHeaders: opts.GetWarmupHTTPHeaders(), RequestDelayMilliseconds: opts.RequestDelayMilliseconds, + ConcurrencyTargetSeconds: opts.GetConcurrencyTargetSeconds(), } wp.Run(hasHttpRequests, hasGrpcRequests, &requestsSentCounter) } else { diff --git a/docs/about/getting-started.md b/docs/about/getting-started.md index 4b99077..59af1cc 100644 --- a/docs/about/getting-started.md +++ b/docs/about/getting-started.md @@ -31,6 +31,7 @@ The application receives a number of command-line flags including the requests t | -target-readiness-port | int | same as -target-http-port | The port used for target readiness probe | | -target-readiness-protocol | string | http | Protocol to be used for readiness check. One of [`http`, `grpc`] | | -max-duration-seconds | int | 60 | Maximum duration in seconds after which warm up will stop making requests | +| -concurrency-target-seconds | int | 0 | Time taken to reach expected concurrency. This is useful to ramp up traffic. | ### Warmup request A warmup request can be an HTTP one (over REST) or a gRPC one. diff --git a/internal/pkg/warmup/warmup.go b/internal/pkg/warmup/warmup.go index 101e4fd..9dea943 100644 --- a/internal/pkg/warmup/warmup.go +++ b/internal/pkg/warmup/warmup.go @@ -34,6 +34,7 @@ type Warmup struct { HttpHeaders []string GrpcRequests chan grpc.Request RequestDelayMilliseconds int + ConcurrencyTargetSeconds int } // Run sends requests to the target using goroutines. @@ -41,6 +42,7 @@ func (w Warmup) Run(hasHttpRequests bool, hasGrpcRequests bool, requestsSentCoun rand.Seed(time.Now().UnixNano()) // initialize seed only once to prevent deterministic/repeated calls every time we run var wg sync.WaitGroup + var rampUpInterval = w.ConcurrencyTargetSeconds / w.Concurrency if hasGrpcRequests { // connect to gRPC server once and only if there are gRPC requests @@ -51,6 +53,7 @@ func (w Warmup) Run(hasHttpRequests bool, hasGrpcRequests bool, requestsSentCoun log.Printf("gRPC client connect error: %v", connErr) } else { for i := 1; i <= w.Concurrency; i++ { + waitForRampUp(rampUpInterval, i) log.Printf("Spawning new go routine for gRPC requests") wg.Add(1) go safe.Do(func() { @@ -62,6 +65,7 @@ func (w Warmup) Run(hasHttpRequests bool, hasGrpcRequests bool, requestsSentCoun if hasHttpRequests { for i := 1; i <= w.Concurrency; i++ { + waitForRampUp(rampUpInterval, i) log.Printf("Spawning new go routine for HTTP requests") wg.Add(1) go safe.Do(func() { @@ -112,3 +116,9 @@ func (w Warmup) GrpcWarmupWorker(wg *sync.WaitGroup, requests <-chan grpc.Reques } wg.Done() } + +func waitForRampUp(rampUpInterval int, currentConcurrency int) { + if currentConcurrency > 1 && rampUpInterval > 0 { + time.Sleep(time.Duration(rampUpInterval) * time.Second) + } +} diff --git a/test/root_test.go b/test/root_test.go index 4f897f4..b7c491d 100644 --- a/test/root_test.go +++ b/test/root_test.go @@ -172,6 +172,7 @@ func TestGrpcAndHttp(t *testing.T) { "-exit-after-warmup=true", "-target-readiness-http-path=/health", "-max-duration-seconds=2", + "-concurrency-target-seconds=1", } cmd.CreateConfig()