diff --git a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go index 4dca839..96162f7 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go @@ -33,7 +33,7 @@ that nodes. It repeats to create and delete job. The load profile is fixed. ctx := context.Background() kubeCfgPath := cliCtx.GlobalString("kubeconfig") - rgCfgFile, rgCfgFileDone, err := newLoadProfileFromEmbed( + rgCfgFile, rgCfgFileDone, err := utils.NewLoadProfileFromEmbed( "loadprofile/node100_job1_pod3k.yaml", func(spec *types.RunnerGroupSpec) error { reqs := cliCtx.Int("total-requests") @@ -77,7 +77,13 @@ that nodes. It repeats to create and delete job. The load profile is fixed. }() // TODO(weifu): write result into file - derr := deployRunnerGroup(ctx, cliCtx, rgCfgFile) + _, derr := utils.DeployRunnerGroup(ctx, + cliCtx.GlobalString("kubeconfig"), + cliCtx.GlobalString("runner-image"), + rgCfgFile, + cliCtx.GlobalString("runner-flowcontrol"), + cliCtx.GlobalString("rg-affinity"), + ) jobCancel() wg.Wait() return derr diff --git a/contrib/cmd/runkperf/commands/bench/root.go b/contrib/cmd/runkperf/commands/bench/root.go index df3906c..a3a70ec 100644 --- a/contrib/cmd/runkperf/commands/bench/root.go +++ b/contrib/cmd/runkperf/commands/bench/root.go @@ -3,16 +3,12 @@ package bench import ( "context" "fmt" - "time" - "github.com/Azure/kperf/api/types" kperfcmdutils "github.com/Azure/kperf/cmd/kperf/commands/utils" - "github.com/Azure/kperf/contrib/internal/manifests" "github.com/Azure/kperf/contrib/internal/utils" "k8s.io/klog/v2" "github.com/urfave/cli" - "gopkg.in/yaml.v2" ) // Command represents bench subcommand. @@ -95,76 +91,3 @@ func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target stri return kr.DeleteNodepool(ctx, 0, target) }, nil } - -// deployRunnerGroup deploys runner group for benchmark. -func deployRunnerGroup(ctx context.Context, cliCtx *cli.Context, rgCfgFile string) error { - klog.V(0).InfoS("Deploying runner group", "config", rgCfgFile) - - kubeCfgPath := cliCtx.GlobalString("kubeconfig") - runnerImage := cliCtx.GlobalString("runner-image") - - kr := utils.NewKperfRunner(kubeCfgPath, runnerImage) - - klog.V(0).Info("Deleting existing runner group") - derr := kr.RGDelete(ctx, 0) - if derr != nil { - klog.V(0).ErrorS(derr, "failed to delete existing runner group") - } - - runnerFlowControl := cliCtx.GlobalString("runner-flowcontrol") - runnerGroupAffinity := cliCtx.GlobalString("rg-affinity") - - rerr := kr.RGRun(ctx, 0, rgCfgFile, runnerFlowControl, runnerGroupAffinity) - if rerr != nil { - return fmt.Errorf("failed to deploy runner group: %w", rerr) - } - - klog.V(0).Info("Waiting runner group") - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - res, err := kr.RGResult(ctx, 1*time.Minute) - if err != nil { - klog.V(0).ErrorS(err, "failed to fetch warmup runner group's result") - continue - } - klog.V(0).InfoS("Runner group's result", "data", res) - - klog.V(0).Info("Deleting runner group") - if derr := kr.RGDelete(ctx, 0); derr != nil { - klog.V(0).ErrorS(err, "failed to delete runner group") - } - return nil - } -} - -// newLoadProfileFromEmbed reads load profile from embed memory. -func newLoadProfileFromEmbed(target string, tweakFn func(*types.RunnerGroupSpec) error) (_name string, _cleanup func() error, _ error) { - data, err := manifests.FS.ReadFile(target) - if err != nil { - return "", nil, fmt.Errorf("unexpected error when read %s from embed memory: %v", target, err) - } - - if tweakFn != nil { - var spec types.RunnerGroupSpec - if err = yaml.Unmarshal(data, &spec); err != nil { - return "", nil, fmt.Errorf("failed to unmarshal into runner group spec:\n (data: %s)\n: %w", - string(data), err) - } - - if err = tweakFn(&spec); err != nil { - return "", nil, err - } - - data, err = yaml.Marshal(spec) - if err != nil { - return "", nil, fmt.Errorf("failed to marshal runner group spec after tweak: %w", err) - } - } - - return utils.CreateTempFileWithContent(data) -} diff --git a/contrib/cmd/runkperf/commands/ekswarmup/command.go b/contrib/cmd/runkperf/commands/ekswarmup/command.go index 63d5ed3..683ea1d 100644 --- a/contrib/cmd/runkperf/commands/ekswarmup/command.go +++ b/contrib/cmd/runkperf/commands/ekswarmup/command.go @@ -8,10 +8,11 @@ import ( "sync" "time" - "github.com/Azure/kperf/contrib/internal/manifests" + "github.com/Azure/kperf/api/types" "github.com/Azure/kperf/contrib/internal/utils" "github.com/urfave/cli" + "gopkg.in/yaml.v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -38,12 +39,52 @@ var Command = cli.Command{ // Right now, we need to set image manually. Required: true, }, + cli.StringFlag{ + Name: "runner-flowcontrol", + Usage: "Apply flowcontrol to runner group. (FORMAT: PriorityLevel:MatchingPrecedence)", + Value: "workload-low:1000", + }, + cli.Float64Flag{ + Name: "rate", + Usage: "The maximum requests per second per runner (There are 10 runners totally)", + Value: 20, + }, + cli.IntFlag{ + Name: "total", + Usage: "Total requests per runner (There are 10 runners totally and runner's rate is 20)", + Value: 10000, + }, }, Action: func(cliCtx *cli.Context) (retErr error) { ctx := context.Background() + rgCfgFile, rgCfgFileDone, err := utils.NewLoadProfileFromEmbed( + "loadprofile/ekswarmup.yaml", + func(spec *types.RunnerGroupSpec) error { + reqs := cliCtx.Int("total") + if reqs < 0 { + return fmt.Errorf("invalid total value: %v", reqs) + } + + rate := cliCtx.Float64("rate") + if rate <= 0 { + return fmt.Errorf("invalid rate value: %v", rate) + } + + spec.Profile.Spec.Total = reqs + spec.Profile.Spec.Rate = rate + + data, _ := yaml.Marshal(spec) + klog.V(2).InfoS("Load Profile", "config", string(data)) + return nil + }, + ) + if err != nil { + return err + } + defer func() { _ = rgCfgFileDone() }() + kubeCfgPath := cliCtx.String("kubeconfig") - runnerImage := cliCtx.String("runner-image") perr := patchEKSDaemonsetWithoutToleration(ctx, kubeCfgPath) if perr != nil { @@ -81,7 +122,13 @@ var Command = cli.Command{ utils.RepeatJobWith3KPod(jobCtx, kubeCfgPath, "warmupjob", 5*time.Second) }() - derr := deployWarmupRunnerGroup(ctx, kubeCfgPath, runnerImage) + _, derr := utils.DeployRunnerGroup(ctx, + kubeCfgPath, + cliCtx.String("runner-image"), + rgCfgFile, + cliCtx.String("runner-flowcontrol"), + "", + ) jobCancel() wg.Wait() @@ -134,59 +181,6 @@ func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string) (func( }, nil } -// deployWarmupRunnerGroup deploys warmup runner group to trigger resource update. -func deployWarmupRunnerGroup(ctx context.Context, kubeCfgPath string, runnerImage string) error { - klog.V(0).Info("Deploying warmup runner group") - - target := "loadprofile/ekswarmup.yaml" - - data, err := manifests.FS.ReadFile(target) - if err != nil { - return fmt.Errorf("failed to read %s from embed memory: %w", target, err) - } - - rgCfgFile, cleanup, err := utils.CreateTempFileWithContent(data) - if err != nil { - return fmt.Errorf("failed to create temporary file for %s: %w", target, err) - } - defer func() { _ = cleanup() }() - - kr := utils.NewKperfRunner(kubeCfgPath, runnerImage) - - klog.V(0).Info("Deleting existing runner group") - derr := kr.RGDelete(ctx, 0) - if derr != nil { - klog.V(0).ErrorS(derr, "failed to delete existing runner group") - } - - rerr := kr.RGRun(ctx, 0, rgCfgFile, "workload-low:1000", "") - if rerr != nil { - return fmt.Errorf("failed to deploy warmup runner group: %w", rerr) - } - - klog.V(0).Info("Waiting warmup runner group") - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - res, err := kr.RGResult(ctx, 1*time.Minute) - if err != nil { - klog.V(0).ErrorS(err, "failed to fetch warmup runner group's result") - continue - } - klog.V(0).Infof("Warmup runner group's result: %s", res) - - klog.V(0).Info("Deleting warmup runner group") - if derr := kr.RGDelete(ctx, 0); derr != nil { - klog.V(0).ErrorS(err, "failed to delete existing runner group") - } - return nil - } -} - // patchEKSDaemonsetWithoutToleration removes tolerations to avoid pod scheduled // to virtual nodes. func patchEKSDaemonsetWithoutToleration(ctx context.Context, kubeCfgPath string) error { diff --git a/contrib/internal/utils/utils.go b/contrib/internal/utils/utils.go index d7ddddc..74d40d3 100644 --- a/contrib/internal/utils/utils.go +++ b/contrib/internal/utils/utils.go @@ -11,8 +11,10 @@ import ( "syscall" "time" + "github.com/Azure/kperf/api/types" "github.com/Azure/kperf/contrib/internal/manifests" + "gopkg.in/yaml.v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -102,6 +104,76 @@ func RepeatJobWith3KPod(ctx context.Context, kubeCfgPath string, namespace strin } } +// NewLoadProfileFromEmbed reads load profile from embed memory. +func NewLoadProfileFromEmbed(target string, tweakFn func(*types.RunnerGroupSpec) error) (_name string, _cleanup func() error, _ error) { + data, err := manifests.FS.ReadFile(target) + if err != nil { + return "", nil, fmt.Errorf("unexpected error when read %s from embed memory: %v", target, err) + } + + if tweakFn != nil { + var spec types.RunnerGroupSpec + if err = yaml.Unmarshal(data, &spec); err != nil { + return "", nil, fmt.Errorf("failed to unmarshal into runner group spec:\n (data: %s)\n: %w", + string(data), err) + } + + if err = tweakFn(&spec); err != nil { + return "", nil, err + } + + data, err = yaml.Marshal(spec) + if err != nil { + return "", nil, fmt.Errorf("failed to marshal runner group spec after tweak: %w", err) + } + } + + return CreateTempFileWithContent(data) +} + +// DeployRunnerGroup deploys runner group for benchmark. +func DeployRunnerGroup(ctx context.Context, + kubeCfgPath, runnerImage, rgCfgFile string, + runnerFlowControl, runnerGroupAffinity string) (string, error) { + + klog.V(0).InfoS("Deploying runner group", "config", rgCfgFile) + + kr := NewKperfRunner(kubeCfgPath, runnerImage) + + klog.V(0).Info("Deleting existing runner group") + derr := kr.RGDelete(ctx, 0) + if derr != nil { + klog.V(0).ErrorS(derr, "failed to delete existing runner group") + } + + rerr := kr.RGRun(ctx, 0, rgCfgFile, runnerFlowControl, runnerGroupAffinity) + if rerr != nil { + return "", fmt.Errorf("failed to deploy runner group: %w", rerr) + } + + klog.V(0).Info("Waiting runner group") + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + default: + } + + res, err := kr.RGResult(ctx, 1*time.Minute) + if err != nil { + klog.V(0).ErrorS(err, "failed to fetch warmup runner group's result") + continue + } + klog.V(0).InfoS("Runner group's result", "data", res) + + klog.V(0).Info("Deleting runner group") + if derr := kr.RGDelete(ctx, 0); derr != nil { + klog.V(0).ErrorS(err, "failed to delete runner group") + } + return res, nil + } +} + // FetchNodeProviderIDByType is used to get one node's provider id with a given // instance type. func FetchNodeProviderIDByType(ctx context.Context, kubeCfgPath string, instanceType string) (string, error) {