diff --git a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go index 96162f7..0482933 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go @@ -8,6 +8,7 @@ import ( "github.com/Azure/kperf/api/types" kperfcmdutils "github.com/Azure/kperf/cmd/kperf/commands/utils" + internaltypes "github.com/Azure/kperf/contrib/internal/types" "github.com/Azure/kperf/contrib/internal/utils" "github.com/urfave/cli" @@ -30,62 +31,77 @@ that nodes. It repeats to create and delete job. The load profile is fixed. }, }, Action: func(cliCtx *cli.Context) error { - ctx := context.Background() - kubeCfgPath := cliCtx.GlobalString("kubeconfig") - - rgCfgFile, rgCfgFileDone, err := utils.NewLoadProfileFromEmbed( - "loadprofile/node100_job1_pod3k.yaml", - func(spec *types.RunnerGroupSpec) error { - reqs := cliCtx.Int("total-requests") - if reqs < 0 { - return fmt.Errorf("invalid total-requests value: %v", reqs) - } - - rgAffinity := cliCtx.GlobalString("rg-affinity") - affinityLabels, err := kperfcmdutils.KeyValuesMap([]string{rgAffinity}) - if err != nil { - return fmt.Errorf("failed to parse %s affinity: %w", rgAffinity, err) - } - - spec.Profile.Spec.Total = reqs - spec.NodeAffinity = affinityLabels - - data, _ := yaml.Marshal(spec) - klog.V(2).InfoS("Load Profile", "config", string(data)) - return nil - }, - ) - if err != nil { - return err - } - defer func() { _ = rgCfgFileDone() }() - - vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100job1pod3k", 100, 110) - if err != nil { - return fmt.Errorf("failed to deploy virtual node: %w", err) - } - defer func() { _ = vcDone() }() - - var wg sync.WaitGroup - wg.Add(1) - - jobCtx, jobCancel := context.WithCancel(ctx) - go func() { - defer wg.Done() - - utils.RepeatJobWith3KPod(jobCtx, kubeCfgPath, "job1pod3k", 5*time.Second) - }() - - // TODO(weifu): write result into file - _, derr := utils.DeployRunnerGroup(ctx, - cliCtx.GlobalString("kubeconfig"), - cliCtx.GlobalString("runner-image"), - rgCfgFile, - cliCtx.GlobalString("runner-flowcontrol"), - cliCtx.GlobalString("rg-affinity"), - ) - jobCancel() - wg.Wait() - return derr + _, err := renderBenchmarkReportInterceptor( + addAPIServerCoresInfoInterceptor(benchNode100Job1Pod3KCaseRun), + )(cliCtx) + return err }, } + +// benchNode100Job1Pod3KCaseRun is for benchNode100Job1Pod3KCase subcommand. +func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { + ctx := context.Background() + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + + rgCfgFile, rgCfgFileDone, err := utils.NewLoadProfileFromEmbed( + "loadprofile/node100_job1_pod3k.yaml", + func(spec *types.RunnerGroupSpec) error { + reqs := cliCtx.Int("total-requests") + if reqs < 0 { + return fmt.Errorf("invalid total-requests value: %v", reqs) + } + + rgAffinity := cliCtx.GlobalString("rg-affinity") + affinityLabels, err := kperfcmdutils.KeyValuesMap([]string{rgAffinity}) + if err != nil { + return fmt.Errorf("failed to parse %s affinity: %w", rgAffinity, err) + } + + spec.Profile.Spec.Total = reqs + spec.NodeAffinity = affinityLabels + + data, _ := yaml.Marshal(spec) + klog.V(2).InfoS("Load Profile", "config", string(data)) + return nil + }, + ) + if err != nil { + return nil, err + } + defer func() { _ = rgCfgFileDone() }() + + vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100job1pod3k", 100, 110) + if err != nil { + return nil, fmt.Errorf("failed to deploy virtual node: %w", err) + } + defer func() { _ = vcDone() }() + + var wg sync.WaitGroup + wg.Add(1) + + jobCtx, jobCancel := context.WithCancel(ctx) + go func() { + defer wg.Done() + + utils.RepeatJobWith3KPod(jobCtx, kubeCfgPath, "job1pod3k", 5*time.Second) + }() + + rgResult, derr := utils.DeployRunnerGroup(ctx, + cliCtx.GlobalString("kubeconfig"), + cliCtx.GlobalString("runner-image"), + rgCfgFile, + cliCtx.GlobalString("runner-flowcontrol"), + cliCtx.GlobalString("rg-affinity"), + ) + jobCancel() + wg.Wait() + + if derr != nil { + return nil, derr + } + + return &internaltypes.BenchmarkReport{ + RunnerGroupsReport: *rgResult, + Info: make(map[string]interface{}), + }, nil +} diff --git a/contrib/cmd/runkperf/commands/bench/root.go b/contrib/cmd/runkperf/commands/bench/root.go index a3a70ec..0a7193e 100644 --- a/contrib/cmd/runkperf/commands/bench/root.go +++ b/contrib/cmd/runkperf/commands/bench/root.go @@ -1,12 +1,7 @@ package bench import ( - "context" - "fmt" - kperfcmdutils "github.com/Azure/kperf/cmd/kperf/commands/utils" - "github.com/Azure/kperf/contrib/internal/utils" - "k8s.io/klog/v2" "github.com/urfave/cli" ) @@ -51,43 +46,12 @@ var Command = cli.Command{ Usage: "Indicates the target kubernetes cluster is EKS", Hidden: true, }, + cli.StringFlag{ + Name: "result", + Usage: "Path to the file which stores results", + }, }, Subcommands: []cli.Command{ benchNode100Job1Pod3KCase, }, } - -// deployVirtualNodepool deploys virtual nodepool. -func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target string, nodes, maxPods int) (func() error, error) { - klog.V(0).InfoS("Deploying virtual nodepool", "name", target) - - kubeCfgPath := cliCtx.GlobalString("kubeconfig") - virtualNodeAffinity := cliCtx.GlobalString("vc-affinity") - - kr := utils.NewKperfRunner(kubeCfgPath, "") - - var sharedProviderID string - var err error - - if cliCtx.GlobalBool("eks") { - sharedProviderID, err = utils.FetchNodeProviderIDByType(ctx, kubeCfgPath, utils.EKSIdleNodepoolInstanceType) - if err != nil { - return nil, fmt.Errorf("failed to get EKS idle node (type: %s) providerID: %w", - utils.EKSIdleNodepoolInstanceType, err) - } - } - - klog.V(0).InfoS("Trying to delete nodepool if necessary", "name", target) - if err = kr.DeleteNodepool(ctx, 0, target); err != nil { - klog.V(0).ErrorS(err, "failed to delete nodepool", "name", target) - } - - err = kr.NewNodepool(ctx, 0, target, nodes, maxPods, virtualNodeAffinity, sharedProviderID) - if err != nil { - return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err) - } - - return func() error { - return kr.DeleteNodepool(ctx, 0, target) - }, nil -} diff --git a/contrib/cmd/runkperf/commands/bench/utils.go b/contrib/cmd/runkperf/commands/bench/utils.go new file mode 100644 index 0000000..367b8a8 --- /dev/null +++ b/contrib/cmd/runkperf/commands/bench/utils.go @@ -0,0 +1,122 @@ +package bench + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + + internaltypes "github.com/Azure/kperf/contrib/internal/types" + "github.com/Azure/kperf/contrib/internal/utils" + + "github.com/urfave/cli" + "k8s.io/klog/v2" +) + +// subcmdActionFunc is to unify each subcommand's interface. They should return +// benchmark report as result. +type subcmdActionFunc func(*cli.Context) (*internaltypes.BenchmarkReport, error) + +// addAPIServerCoresInfoInterceptor adds apiserver's cores into benchmark report. +func addAPIServerCoresInfoInterceptor(handler subcmdActionFunc) subcmdActionFunc { + return func(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { + ctx := context.Background() + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + + beforeCores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath) + if ferr != nil { + klog.ErrorS(ferr, "failed to fetch apiserver cores") + } + + report, err := handler(cliCtx) + if err != nil { + return nil, err + } + + afterCores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath) + if ferr != nil { + klog.ErrorS(ferr, "failed to fetch apiserver cores") + } + + report.Info["apiserver"] = map[string]interface{}{ + "cores": map[string]interface{}{ + "before": beforeCores, + "after": afterCores, + }, + } + return report, nil + } +} + +// renderBenchmarkReportInterceptor renders benchmark report into file or stdout. +func renderBenchmarkReportInterceptor(handler subcmdActionFunc) subcmdActionFunc { + return func(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { + report, err := handler(cliCtx) + if err != nil { + return nil, err + } + + outF := os.Stdout + if targetFile := cliCtx.GlobalString("result"); targetFile != "" { + targetFileDir := filepath.Dir(targetFile) + + _, err = os.Stat(targetFileDir) + if err != nil && os.IsNotExist(err) { + err = os.MkdirAll(targetFileDir, 0750) + } + if err != nil { + return nil, fmt.Errorf("failed to ensure output's dir %s: %w", targetFileDir, err) + } + + outF, err = os.Create(targetFile) + if err != nil { + return nil, err + } + defer outF.Close() + } + + encoder := json.NewEncoder(outF) + encoder.SetIndent("", " ") + + if err := encoder.Encode(report); err != nil { + return nil, fmt.Errorf("failed to encode json: %w", err) + } + return report, nil + } +} + +// deployVirtualNodepool deploys virtual nodepool. +func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target string, nodes, maxPods int) (func() error, error) { + klog.V(0).InfoS("Deploying virtual nodepool", "name", target) + + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + virtualNodeAffinity := cliCtx.GlobalString("vc-affinity") + + kr := utils.NewKperfRunner(kubeCfgPath, "") + + var sharedProviderID string + var err error + + if cliCtx.GlobalBool("eks") { + sharedProviderID, err = utils.FetchNodeProviderIDByType(ctx, kubeCfgPath, utils.EKSIdleNodepoolInstanceType) + if err != nil { + return nil, fmt.Errorf("failed to get EKS idle node (type: %s) providerID: %w", + utils.EKSIdleNodepoolInstanceType, err) + } + } + + klog.V(0).InfoS("Trying to delete nodepool if necessary", "name", target) + if err = kr.DeleteNodepool(ctx, 0, target); err != nil { + klog.V(0).ErrorS(err, "failed to delete nodepool", "name", target) + } + + err = kr.NewNodepool(ctx, 0, target, nodes, maxPods, virtualNodeAffinity, sharedProviderID) + if err != nil { + return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err) + } + + return func() error { + return kr.DeleteNodepool(ctx, 0, target) + }, nil +} diff --git a/contrib/cmd/runkperf/commands/ekswarmup/command.go b/contrib/cmd/runkperf/commands/ekswarmup/command.go index 683ea1d..ee87d12 100644 --- a/contrib/cmd/runkperf/commands/ekswarmup/command.go +++ b/contrib/cmd/runkperf/commands/ekswarmup/command.go @@ -3,8 +3,6 @@ package ekswarmup import ( "context" "fmt" - "strconv" - "strings" "sync" "time" @@ -91,7 +89,7 @@ var Command = cli.Command{ return perr } - cores, ferr := fetchAPIServerCores(ctx, kubeCfgPath) + cores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath) if ferr == nil { if isReady(cores) { klog.V(0).Infof("apiserver resource is ready: %v", cores) @@ -132,7 +130,7 @@ var Command = cli.Command{ jobCancel() wg.Wait() - cores, ferr = fetchAPIServerCores(ctx, kubeCfgPath) + cores, ferr = utils.FetchAPIServerCores(ctx, kubeCfgPath) if ferr == nil { if isReady(cores) { klog.V(0).Infof("apiserver resource is ready: %v", cores) @@ -204,55 +202,6 @@ func patchEKSDaemonsetWithoutToleration(ctx context.Context, kubeCfgPath string) return nil } -// fetchAPIServerCores fetchs core number for each kube-apiserver. -func fetchAPIServerCores(ctx context.Context, kubeCfgPath string) (map[string]int, error) { - klog.V(0).Info("Fetching apiserver's cores") - - kr := utils.NewKubectlRunner(kubeCfgPath, "") - fqdn, err := kr.FQDN(ctx, 0) - if err != nil { - return nil, fmt.Errorf("failed to get cluster fqdn: %w", err) - } - - ips, nerr := utils.NSLookup(fqdn) - if nerr != nil { - return nil, fmt.Errorf("failed get dns records of fqdn %s: %w", fqdn, nerr) - } - - res := map[string]int{} - for _, ip := range ips { - cores, err := func() (int, error) { - data, err := kr.Metrics(ctx, 0, fqdn, ip) - if err != nil { - return 0, fmt.Errorf("failed to get metrics for ip %s: %w", ip, err) - } - - lines := strings.Split(string(data), "\n") - for _, line := range lines { - if strings.HasPrefix(line, "go_sched_gomaxprocs_threads") { - vInStr := strings.Fields(line)[1] - v, err := strconv.Atoi(vInStr) - if err != nil { - return 0, fmt.Errorf("failed to parse go_sched_gomaxprocs_threads %s: %w", line, err) - } - return v, nil - } - } - return 0, fmt.Errorf("failed to get go_sched_gomaxprocs_threads") - }() - if err != nil { - klog.V(0).ErrorS(err, "failed to get cores", "ip", ip) - continue - } - klog.V(0).InfoS("apiserver cores", ip, cores) - res[ip] = cores - } - if len(res) < 2 { - return nil, fmt.Errorf("expected two instances at least") - } - return res, nil -} - // mustClientset returns kubernetes clientset without error. func mustClientset(kubeCfgPath string) *kubernetes.Clientset { config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) diff --git a/contrib/internal/types/report.go b/contrib/internal/types/report.go new file mode 100644 index 0000000..0667e4b --- /dev/null +++ b/contrib/internal/types/report.go @@ -0,0 +1,12 @@ +package types + +import apitypes "github.com/Azure/kperf/api/types" + +// BenchmarkReport represents runkperf-bench's result. +type BenchmarkReport struct { + apitypes.RunnerGroupsReport + // Info is additional information. + // + // FIXME(weifu): Use struct after finialized. + Info map[string]interface{} +} diff --git a/contrib/internal/utils/utils.go b/contrib/internal/utils/utils.go index 74d40d3..5fa4c5c 100644 --- a/contrib/internal/utils/utils.go +++ b/contrib/internal/utils/utils.go @@ -2,11 +2,13 @@ package utils import ( "context" + "encoding/json" "fmt" "net" "os" "os/exec" "sort" + "strconv" "strings" "syscall" "time" @@ -134,44 +136,101 @@ func NewLoadProfileFromEmbed(target string, tweakFn func(*types.RunnerGroupSpec) // DeployRunnerGroup deploys runner group for benchmark. func DeployRunnerGroup(ctx context.Context, kubeCfgPath, runnerImage, rgCfgFile string, - runnerFlowControl, runnerGroupAffinity string) (string, error) { + runnerFlowControl, runnerGroupAffinity string) (*types.RunnerGroupsReport, error) { - klog.V(0).InfoS("Deploying runner group", "config", rgCfgFile) + klog.InfoS("Deploying runner group", "config", rgCfgFile) kr := NewKperfRunner(kubeCfgPath, runnerImage) - klog.V(0).Info("Deleting existing runner group") + klog.Info("Deleting existing runner group") derr := kr.RGDelete(ctx, 0) if derr != nil { - klog.V(0).ErrorS(derr, "failed to delete existing runner group") + klog.ErrorS(derr, "failed to delete existing runner group") } rerr := kr.RGRun(ctx, 0, rgCfgFile, runnerFlowControl, runnerGroupAffinity) if rerr != nil { - return "", fmt.Errorf("failed to deploy runner group: %w", rerr) + return nil, fmt.Errorf("failed to deploy runner group: %w", rerr) } - klog.V(0).Info("Waiting runner group") + klog.Info("Waiting runner group") for { select { case <-ctx.Done(): - return "", ctx.Err() + return nil, ctx.Err() default: } - res, err := kr.RGResult(ctx, 1*time.Minute) + // NOTE: The result subcommand will hold the long connection + // until runner-group's server replies. However, there is no + // data transport before runners finish. If the apiserver + // has been restarted, the proxy tunnel will be broken and + // the client won't be notified. So, the client will hang forever. + // Using 1 min as timeout is to ensure we can get result in time. + data, err := kr.RGResult(ctx, 1*time.Minute) if err != nil { - klog.V(0).ErrorS(err, "failed to fetch warmup runner group's result") + klog.ErrorS(err, "failed to fetch warmup runner group's result") continue } - klog.V(0).InfoS("Runner group's result", "data", res) + klog.InfoS("Runner group's result", "data", data) - klog.V(0).Info("Deleting runner group") + var rgResult types.RunnerGroupsReport + if err = json.Unmarshal([]byte(data), &rgResult); err != nil { + return nil, fmt.Errorf("failed to unmarshal into RunnerGroupsReport: %w", err) + } + + klog.Info("Deleting runner group") if derr := kr.RGDelete(ctx, 0); derr != nil { - klog.V(0).ErrorS(err, "failed to delete runner group") + klog.ErrorS(err, "failed to delete runner group") + } + return &rgResult, nil + } +} + +// FetchAPIServerCores fetchs core number for each kube-apiserver. +func FetchAPIServerCores(ctx context.Context, kubeCfgPath string) (map[string]int, error) { + klog.V(0).Info("Fetching apiserver's cores") + + kr := NewKubectlRunner(kubeCfgPath, "") + fqdn, err := kr.FQDN(ctx, 0) + if err != nil { + return nil, fmt.Errorf("failed to get cluster fqdn: %w", err) + } + + ips, nerr := NSLookup(fqdn) + if nerr != nil { + return nil, fmt.Errorf("failed get dns records of fqdn %s: %w", fqdn, nerr) + } + + res := map[string]int{} + for _, ip := range ips { + cores, err := func() (int, error) { + data, err := kr.Metrics(ctx, 0, fqdn, ip) + if err != nil { + return 0, fmt.Errorf("failed to get metrics for ip %s: %w", ip, err) + } + + lines := strings.Split(string(data), "\n") + for _, line := range lines { + if strings.HasPrefix(line, "go_sched_gomaxprocs_threads") { + vInStr := strings.Fields(line)[1] + v, err := strconv.Atoi(vInStr) + if err != nil { + return 0, fmt.Errorf("failed to parse go_sched_gomaxprocs_threads %s: %w", line, err) + } + return v, nil + } + } + return 0, fmt.Errorf("failed to get go_sched_gomaxprocs_threads") + }() + if err != nil { + klog.V(0).ErrorS(err, "failed to get cores", "ip", ip) + continue } - return res, nil + klog.V(0).InfoS("apiserver cores", ip, cores) + res[ip] = cores } + return res, nil } // FetchNodeProviderIDByType is used to get one node's provider id with a given