Skip to content

Commit

Permalink
Add support for concurrency ramp up (#202)
Browse files Browse the repository at this point in the history
* Add support for concurrency ramp up
  • Loading branch information
sumitanvekar authored Nov 30, 2021
1 parent 688b80b commit 74d5fe8
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cmd/flags/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Root struct {
MaxDurationSeconds int
Concurrency int
RequestDelayMilliseconds int
ConcurrencyTargetSeconds int
ExitAfterWarmup bool
FailReadiness bool
FileProbe
Expand All @@ -48,6 +49,7 @@ func (r *Root) InitFlags() {
flag.IntVar(&r.MaxDurationSeconds, "max-duration-seconds", 60, "Max duration in seconds after which warm up will stop making requests")
flag.IntVar(&r.Concurrency, "concurrency", 2, "Number of concurrent requests for warm up")
flag.IntVar(&r.RequestDelayMilliseconds, "request-delay-milliseconds", 500, "Delay in milliseconds between requests")
flag.IntVar(&r.ConcurrencyTargetSeconds, "concurrency-target-seconds", 0, "Time taken to reach expected concurrency. This is useful to ramp up traffic.")
flag.BoolVar(&r.ExitAfterWarmup, "exit-after-warmup", false, "If warm up process should finish after completion. This is useful to prevent container restarts.")
flag.BoolVar(&r.FailReadiness, "fail-readiness", false, "If set to true readiness will fail if no requests were sent.")

Expand All @@ -63,6 +65,11 @@ func (r *Root) GetMaxDurationSeconds() int {
return r.MaxDurationSeconds
}

// GetConcurrencyTargetSeconds returns the value of the concurrency-target-seconds parameter.
func (r *Root) GetConcurrencyTargetSeconds() int {
return r.ConcurrencyTargetSeconds
}

// GetConcurrency returns the value of the concurrency parameter.
func (r *Root) GetConcurrency() int {
return r.Concurrency
Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func run() int {
GrpcRequests: grpcRequests,
HttpHeaders: opts.GetWarmupHTTPHeaders(),
RequestDelayMilliseconds: opts.RequestDelayMilliseconds,
ConcurrencyTargetSeconds: opts.GetConcurrencyTargetSeconds(),
}
wp.Run(hasHttpRequests, hasGrpcRequests, &requestsSentCounter)
} else {
Expand Down
1 change: 1 addition & 0 deletions docs/about/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The application receives a number of command-line flags including the requests t
| -target-readiness-port | int | same as -target-http-port | The port used for target readiness probe |
| -target-readiness-protocol | string | http | Protocol to be used for readiness check. One of [`http`, `grpc`] |
| -max-duration-seconds | int | 60 | Maximum duration in seconds after which warm up will stop making requests |
| -concurrency-target-seconds | int | 0 | Time taken to reach expected concurrency. This is useful to ramp up traffic. |

### Warmup request
A warmup request can be an HTTP one (over REST) or a gRPC one.
Expand Down
10 changes: 10 additions & 0 deletions internal/pkg/warmup/warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ type Warmup struct {
HttpHeaders []string
GrpcRequests chan grpc.Request
RequestDelayMilliseconds int
ConcurrencyTargetSeconds int
}

// Run sends requests to the target using goroutines.
func (w Warmup) Run(hasHttpRequests bool, hasGrpcRequests bool, requestsSentCounter *int) {
rand.Seed(time.Now().UnixNano()) // initialize seed only once to prevent deterministic/repeated calls every time we run

var wg sync.WaitGroup
var rampUpInterval = w.ConcurrencyTargetSeconds / w.Concurrency

if hasGrpcRequests {
// connect to gRPC server once and only if there are gRPC requests
Expand All @@ -51,6 +53,7 @@ func (w Warmup) Run(hasHttpRequests bool, hasGrpcRequests bool, requestsSentCoun
log.Printf("gRPC client connect error: %v", connErr)
} else {
for i := 1; i <= w.Concurrency; i++ {
waitForRampUp(rampUpInterval, i)
log.Printf("Spawning new go routine for gRPC requests")
wg.Add(1)
go safe.Do(func() {
Expand All @@ -62,6 +65,7 @@ func (w Warmup) Run(hasHttpRequests bool, hasGrpcRequests bool, requestsSentCoun

if hasHttpRequests {
for i := 1; i <= w.Concurrency; i++ {
waitForRampUp(rampUpInterval, i)
log.Printf("Spawning new go routine for HTTP requests")
wg.Add(1)
go safe.Do(func() {
Expand Down Expand Up @@ -112,3 +116,9 @@ func (w Warmup) GrpcWarmupWorker(wg *sync.WaitGroup, requests <-chan grpc.Reques
}
wg.Done()
}

func waitForRampUp(rampUpInterval int, currentConcurrency int) {
if currentConcurrency > 1 && rampUpInterval > 0 {
time.Sleep(time.Duration(rampUpInterval) * time.Second)
}
}
1 change: 1 addition & 0 deletions test/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func TestGrpcAndHttp(t *testing.T) {
"-exit-after-warmup=true",
"-target-readiness-http-path=/health",
"-max-duration-seconds=2",
"-concurrency-target-seconds=1",
}

cmd.CreateConfig()
Expand Down

0 comments on commit 74d5fe8

Please sign in to comment.