Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib: make ekswarmup tunable #111

Merged
merged 3 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ that nodes. It repeats to create and delete job. The load profile is fixed.
ctx := context.Background()
kubeCfgPath := cliCtx.GlobalString("kubeconfig")

rgCfgFile, rgCfgFileDone, err := newLoadProfileFromEmbed(
rgCfgFile, rgCfgFileDone, err := utils.NewLoadProfileFromEmbed(
"loadprofile/node100_job1_pod3k.yaml",
func(spec *types.RunnerGroupSpec) error {
reqs := cliCtx.Int("total-requests")
Expand Down Expand Up @@ -77,7 +77,13 @@ that nodes. It repeats to create and delete job. The load profile is fixed.
}()

// TODO(weifu): write result into file
derr := deployRunnerGroup(ctx, cliCtx, rgCfgFile)
_, derr := utils.DeployRunnerGroup(ctx,
cliCtx.GlobalString("kubeconfig"),
cliCtx.GlobalString("runner-image"),
rgCfgFile,
cliCtx.GlobalString("runner-flowcontrol"),
cliCtx.GlobalString("rg-affinity"),
)
jobCancel()
wg.Wait()
return derr
Expand Down
77 changes: 0 additions & 77 deletions contrib/cmd/runkperf/commands/bench/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ package bench
import (
"context"
"fmt"
"time"

"github.com/Azure/kperf/api/types"
kperfcmdutils "github.com/Azure/kperf/cmd/kperf/commands/utils"
"github.com/Azure/kperf/contrib/internal/manifests"
"github.com/Azure/kperf/contrib/internal/utils"
"k8s.io/klog/v2"

"github.com/urfave/cli"
"gopkg.in/yaml.v2"
)

// Command represents bench subcommand.
Expand Down Expand Up @@ -95,76 +91,3 @@ func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target stri
return kr.DeleteNodepool(ctx, 0, target)
}, nil
}

// deployRunnerGroup deploys runner group for benchmark.
func deployRunnerGroup(ctx context.Context, cliCtx *cli.Context, rgCfgFile string) error {
klog.V(0).InfoS("Deploying runner group", "config", rgCfgFile)

kubeCfgPath := cliCtx.GlobalString("kubeconfig")
runnerImage := cliCtx.GlobalString("runner-image")

kr := utils.NewKperfRunner(kubeCfgPath, runnerImage)

klog.V(0).Info("Deleting existing runner group")
derr := kr.RGDelete(ctx, 0)
if derr != nil {
klog.V(0).ErrorS(derr, "failed to delete existing runner group")
}

runnerFlowControl := cliCtx.GlobalString("runner-flowcontrol")
runnerGroupAffinity := cliCtx.GlobalString("rg-affinity")

rerr := kr.RGRun(ctx, 0, rgCfgFile, runnerFlowControl, runnerGroupAffinity)
if rerr != nil {
return fmt.Errorf("failed to deploy runner group: %w", rerr)
}

klog.V(0).Info("Waiting runner group")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

res, err := kr.RGResult(ctx, 1*time.Minute)
if err != nil {
klog.V(0).ErrorS(err, "failed to fetch warmup runner group's result")
continue
}
klog.V(0).InfoS("Runner group's result", "data", res)

klog.V(0).Info("Deleting runner group")
if derr := kr.RGDelete(ctx, 0); derr != nil {
klog.V(0).ErrorS(err, "failed to delete runner group")
}
return nil
}
}

// newLoadProfileFromEmbed reads load profile from embed memory.
func newLoadProfileFromEmbed(target string, tweakFn func(*types.RunnerGroupSpec) error) (_name string, _cleanup func() error, _ error) {
data, err := manifests.FS.ReadFile(target)
if err != nil {
return "", nil, fmt.Errorf("unexpected error when read %s from embed memory: %v", target, err)
}

if tweakFn != nil {
var spec types.RunnerGroupSpec
if err = yaml.Unmarshal(data, &spec); err != nil {
return "", nil, fmt.Errorf("failed to unmarshal into runner group spec:\n (data: %s)\n: %w",
string(data), err)
}

if err = tweakFn(&spec); err != nil {
return "", nil, err
}

data, err = yaml.Marshal(spec)
if err != nil {
return "", nil, fmt.Errorf("failed to marshal runner group spec after tweak: %w", err)
}
}

return utils.CreateTempFileWithContent(data)
}
106 changes: 50 additions & 56 deletions contrib/cmd/runkperf/commands/ekswarmup/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"sync"
"time"

"github.com/Azure/kperf/contrib/internal/manifests"
"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/contrib/internal/utils"

"github.com/urfave/cli"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -38,12 +39,52 @@ var Command = cli.Command{
// Right now, we need to set image manually.
Required: true,
},
cli.StringFlag{
Name: "runner-flowcontrol",
Usage: "Apply flowcontrol to runner group. (FORMAT: PriorityLevel:MatchingPrecedence)",
Value: "workload-low:1000",
},
cli.Float64Flag{
Name: "rate",
Usage: "The maximum requests per second per runner (There are 10 runners totally)",
Value: 20,
},
cli.IntFlag{
Name: "total",
Usage: "Total requests per runner (There are 10 runners totally and runner's rate is 20)",
Value: 10000,
},
},
Action: func(cliCtx *cli.Context) (retErr error) {
ctx := context.Background()

rgCfgFile, rgCfgFileDone, err := utils.NewLoadProfileFromEmbed(
"loadprofile/ekswarmup.yaml",
func(spec *types.RunnerGroupSpec) error {
reqs := cliCtx.Int("total")
if reqs < 0 {
return fmt.Errorf("invalid total value: %v", reqs)
}

rate := cliCtx.Float64("rate")
if rate <= 0 {
return fmt.Errorf("invalid rate value: %v", rate)
}

spec.Profile.Spec.Total = reqs
spec.Profile.Spec.Rate = rate

data, _ := yaml.Marshal(spec)
klog.V(2).InfoS("Load Profile", "config", string(data))
return nil
},
)
if err != nil {
return err
}
defer func() { _ = rgCfgFileDone() }()

kubeCfgPath := cliCtx.String("kubeconfig")
runnerImage := cliCtx.String("runner-image")

perr := patchEKSDaemonsetWithoutToleration(ctx, kubeCfgPath)
if perr != nil {
Expand Down Expand Up @@ -81,7 +122,13 @@ var Command = cli.Command{
utils.RepeatJobWith3KPod(jobCtx, kubeCfgPath, "warmupjob", 5*time.Second)
}()

derr := deployWarmupRunnerGroup(ctx, kubeCfgPath, runnerImage)
_, derr := utils.DeployRunnerGroup(ctx,
kubeCfgPath,
cliCtx.String("runner-image"),
rgCfgFile,
cliCtx.String("runner-flowcontrol"),
"",
)
jobCancel()
wg.Wait()

Expand Down Expand Up @@ -134,59 +181,6 @@ func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string) (func(
}, nil
}

// deployWarmupRunnerGroup deploys warmup runner group to trigger resource update.
func deployWarmupRunnerGroup(ctx context.Context, kubeCfgPath string, runnerImage string) error {
klog.V(0).Info("Deploying warmup runner group")

target := "loadprofile/ekswarmup.yaml"

data, err := manifests.FS.ReadFile(target)
if err != nil {
return fmt.Errorf("failed to read %s from embed memory: %w", target, err)
}

rgCfgFile, cleanup, err := utils.CreateTempFileWithContent(data)
if err != nil {
return fmt.Errorf("failed to create temporary file for %s: %w", target, err)
}
defer func() { _ = cleanup() }()

kr := utils.NewKperfRunner(kubeCfgPath, runnerImage)

klog.V(0).Info("Deleting existing runner group")
derr := kr.RGDelete(ctx, 0)
if derr != nil {
klog.V(0).ErrorS(derr, "failed to delete existing runner group")
}

rerr := kr.RGRun(ctx, 0, rgCfgFile, "workload-low:1000", "")
if rerr != nil {
return fmt.Errorf("failed to deploy warmup runner group: %w", rerr)
}

klog.V(0).Info("Waiting warmup runner group")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

res, err := kr.RGResult(ctx, 1*time.Minute)
if err != nil {
klog.V(0).ErrorS(err, "failed to fetch warmup runner group's result")
continue
}
klog.V(0).Infof("Warmup runner group's result: %s", res)

klog.V(0).Info("Deleting warmup runner group")
if derr := kr.RGDelete(ctx, 0); derr != nil {
klog.V(0).ErrorS(err, "failed to delete existing runner group")
}
return nil
}
}

// patchEKSDaemonsetWithoutToleration removes tolerations to avoid pod scheduled
// to virtual nodes.
func patchEKSDaemonsetWithoutToleration(ctx context.Context, kubeCfgPath string) error {
Expand Down
72 changes: 72 additions & 0 deletions contrib/internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"syscall"
"time"

"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/contrib/internal/manifests"

"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -102,6 +104,76 @@ func RepeatJobWith3KPod(ctx context.Context, kubeCfgPath string, namespace strin
}
}

// NewLoadProfileFromEmbed reads load profile from embed memory.
func NewLoadProfileFromEmbed(target string, tweakFn func(*types.RunnerGroupSpec) error) (_name string, _cleanup func() error, _ error) {
data, err := manifests.FS.ReadFile(target)
if err != nil {
return "", nil, fmt.Errorf("unexpected error when read %s from embed memory: %v", target, err)
}

if tweakFn != nil {
var spec types.RunnerGroupSpec
if err = yaml.Unmarshal(data, &spec); err != nil {
return "", nil, fmt.Errorf("failed to unmarshal into runner group spec:\n (data: %s)\n: %w",
string(data), err)
}

if err = tweakFn(&spec); err != nil {
return "", nil, err
}

data, err = yaml.Marshal(spec)
if err != nil {
return "", nil, fmt.Errorf("failed to marshal runner group spec after tweak: %w", err)
}
}

return CreateTempFileWithContent(data)
}

// DeployRunnerGroup deploys runner group for benchmark.
func DeployRunnerGroup(ctx context.Context,
kubeCfgPath, runnerImage, rgCfgFile string,
runnerFlowControl, runnerGroupAffinity string) (string, error) {

klog.V(0).InfoS("Deploying runner group", "config", rgCfgFile)

kr := NewKperfRunner(kubeCfgPath, runnerImage)

klog.V(0).Info("Deleting existing runner group")
derr := kr.RGDelete(ctx, 0)
if derr != nil {
klog.V(0).ErrorS(derr, "failed to delete existing runner group")
}

rerr := kr.RGRun(ctx, 0, rgCfgFile, runnerFlowControl, runnerGroupAffinity)
if rerr != nil {
return "", fmt.Errorf("failed to deploy runner group: %w", rerr)
}

klog.V(0).Info("Waiting runner group")
for {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}

res, err := kr.RGResult(ctx, 1*time.Minute)
if err != nil {
klog.V(0).ErrorS(err, "failed to fetch warmup runner group's result")
continue
}
klog.V(0).InfoS("Runner group's result", "data", res)

klog.V(0).Info("Deleting runner group")
if derr := kr.RGDelete(ctx, 0); derr != nil {
klog.V(0).ErrorS(err, "failed to delete runner group")
}
return res, nil
}
}

// FetchNodeProviderIDByType is used to get one node's provider id with a given
// instance type.
func FetchNodeProviderIDByType(ctx context.Context, kubeCfgPath string, instanceType string) (string, error) {
Expand Down
Loading