Skip to content

Commit

Permalink
Merge pull request #118 from fuweid/weifu/always-list-third-one-names…
Browse files Browse the repository at this point in the history
…pace

contrib/*: refactor node100_dp5_pod10k to node100_pod10k
  • Loading branch information
fuweid authored Apr 24, 2024
2 parents 9931b67 + 8d6500b commit 7e101e6
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 164 deletions.
108 changes: 0 additions & 108 deletions contrib/cmd/runkperf/commands/bench/node100_dp5_pod10k.go

This file was deleted.

2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark
}
defer func() { _ = rgCfgFileDone() }()

vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100job1pod3k", 100, 110)
vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100job1pod3k", 100, 32, 96, 110)
if err != nil {
return nil, fmt.Errorf("failed to deploy virtual node: %w", err)
}
Expand Down
206 changes: 206 additions & 0 deletions contrib/cmd/runkperf/commands/bench/node100_pod10k.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package bench

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/internal/utils"

"github.com/urfave/cli"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

var benchNode100DeploymentNPod10KCase = cli.Command{
Name: "node100_pod10k",
Usage: `
The test suite is to setup 100 virtual nodes and deploy N deployments for 10k
pods on that nodes. It repeats to rolling-update deployments one by one during
benchmark.
`,
Flags: []cli.Flag{
cli.IntFlag{
Name: "deployments",
Usage: "The total number of deployments for 10k pods",
Value: 20,
},
cli.IntFlag{
Name: "total",
Usage: "Total requests per runner (There are 10 runners totally and runner's rate is 10)",
Value: 36000,
},
cli.IntFlag{
Name: "padding-bytes",
Usage: "Add <key=data, value=randomStringByLen(padding-bytes)> in pod's annotation to increase pod size",
Value: 0,
},
cli.DurationFlag{
Name: "interval",
Usage: "Interval to restart deployments",
Value: time.Second * 10,
},
},
Action: func(cliCtx *cli.Context) error {
_, err := renderBenchmarkReportInterceptor(
addAPIServerCoresInfoInterceptor(benchNode100DeploymentNPod10KRun),
)(cliCtx)
return err
},
}

// benchNode100DeploymentNPod10KCase is for subcommand benchNode100DeploymentNPod10KCase.
func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) {
ctx := context.Background()
kubeCfgPath := cliCtx.GlobalString("kubeconfig")

rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx,
"loadprofile/node100_pod10k.yaml")
if err != nil {
return nil, err
}
defer func() { _ = rgCfgFileDone() }()

// NOTE: The nodepool name should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml.
vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100pod10k", 100, 16, 64, 110)
if err != nil {
return nil, fmt.Errorf("failed to deploy virtual node: %w", err)
}
defer func() { _ = vcDone() }()

dpCtx, dpCancel := context.WithCancel(ctx)
defer dpCancel()

var wg sync.WaitGroup
wg.Add(1)

restartInterval := cliCtx.Duration("interval")
klog.V(0).Infof("The interval is %v for restaring deployments", restartInterval)

paddingBytes := cliCtx.Int("padding-bytes")
total := cliCtx.Int("deployments")
replica := 10000 / total

// NOTE: The name pattern should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml.
deploymentNamePattern := "benchmark"

rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx,
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval)
if err != nil {
dpCancel()
return nil, fmt.Errorf("failed to setup workload: %w", err)
}
defer ruCleanupFn()

err = dumpDeploymentReplicas(ctx, kubeCfgPath, deploymentNamePattern, total)
if err != nil {
return nil, err
}

podSize, err := getDeploymentPodSize(ctx, kubeCfgPath, deploymentNamePattern)
if err != nil {
return nil, err
}

podSize = (podSize / 1024) * 1024

go func() {
defer wg.Done()

// FIXME(weifu):
//
// DeployRunnerGroup should return ready notification.
// The rolling update should run after runners.
rollingUpdateFn()
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
cliCtx.GlobalString("kubeconfig"),
cliCtx.GlobalString("runner-image"),
rgCfgFile,
cliCtx.GlobalString("runner-flowcontrol"),
cliCtx.GlobalString("rg-affinity"),
)
dpCancel()
wg.Wait()

if derr != nil {
return nil, derr
}

return &internaltypes.BenchmarkReport{
Description: fmt.Sprintf(`
Environment: 100 virtual nodes managed by kwok-controller,
Workload: Deploy %d deployments with %d pods. Rolling-update deployments one by one and the interval is %v`,
total, total*replica, restartInterval),

LoadSpec: *rgSpec,
Result: *rgResult,
Info: map[string]interface{}{
"podSizeInBytes": podSize,
"interval": restartInterval.String(),
},
}, nil
}

// dumpDeploymentReplicas dumps deployment's replica.
func dumpDeploymentReplicas(ctx context.Context, kubeCfgPath string, namePattern string, total int) error {
klog.V(0).Info("Dump deployment's replica information")

cli, err := utils.BuildClientset(kubeCfgPath)
if err != nil {
return err
}

for i := 0; i < total; i++ {
name := fmt.Sprintf("%s-%d", namePattern, i)
ns := name

dp, err := cli.AppsV1().Deployments(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get deployment %s in namespace %s: %w",
name, ns, err)
}

klog.V(0).InfoS("Deployment", "name", name, "ns", ns,
"replica", *dp.Spec.Replicas, "readyReplicas", dp.Status.ReadyReplicas)
}
return nil
}

// getDeploymentPodSize gets the size of pod created by deployment.
func getDeploymentPodSize(ctx context.Context, kubeCfgPath string, namePattern string) (int, error) {
ns := fmt.Sprintf("%s-0", namePattern)
labelSelector := fmt.Sprintf("app=%s", namePattern)

klog.V(0).InfoS("Get the size of pod", "labelSelector", labelSelector, "namespace", ns)

cli, err := utils.BuildClientset(kubeCfgPath)
if err != nil {
return 0, err
}

resp, err := cli.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
Limit: 1,
})
if err != nil {
return 0, fmt.Errorf("failed to list pods with labelSelector %s: %w",
labelSelector, err)
}
if len(resp.Items) == 0 {
return 0, fmt.Errorf("no pod with labelSelector %s in namespace %s: %w",
labelSelector, ns, err)
}

pod := resp.Items[0]
data, err := json.Marshal(pod)
if err != nil {
return 0, fmt.Errorf("failed to json.Marshal pod: %w", err)
}
return len(data), nil
}
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ var Command = cli.Command{
},
Subcommands: []cli.Command{
benchNode100Job1Pod3KCase,
benchNode100Deployment5Pod10KCase,
benchNode100DeploymentNPod10KCase,
},
}
4 changes: 2 additions & 2 deletions contrib/cmd/runkperf/commands/bench/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func renderBenchmarkReportInterceptor(handler subcmdActionFunc) subcmdActionFunc
}

// deployVirtualNodepool deploys virtual nodepool.
func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target string, nodes, maxPods int) (func() error, error) {
func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target string, nodes, cpu, memory, maxPods int) (func() error, error) {
klog.V(0).InfoS("Deploying virtual nodepool", "name", target)

kubeCfgPath := cliCtx.GlobalString("kubeconfig")
Expand All @@ -114,7 +114,7 @@ func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target stri
klog.V(0).ErrorS(err, "failed to delete nodepool", "name", target)
}

err = kr.NewNodepool(ctx, 0, target, nodes, maxPods, virtualNodeAffinity, sharedProviderID)
err = kr.NewNodepool(ctx, 0, target, nodes, cpu, memory, maxPods, virtualNodeAffinity, sharedProviderID)
if err != nil {
return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err)
}
Expand Down
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/ekswarmup/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string) (func(
klog.V(0).ErrorS(err, "failed to delete", "nodepool", target)
}

err = kr.NewNodepool(ctx, 0, target, 100, 110,
err = kr.NewNodepool(ctx, 0, target, 100, 32, 96, 110,
"node.kubernetes.io/instance-type=m4.2xlarge", sharedProviderID)
if err != nil {
return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
count: 10
loadProfile:
version: 1
description: "node100-deployment5-pod10k"
description: "node100-pod10k"
spec:
rate: 10
total: 36000
Expand All @@ -14,12 +14,7 @@ loadProfile:
- staleList:
version: v1
resource: pods
# NOTE: Please align with ../../utils/utils.go#RepeatRollingUpdate10KPod
seletor: "app=benchmark"
# NOTE: Please align with ../../../cmd/runkperf/commands/bench/node100_dp5_pod10k.go.
# And there are only 100 nodes and each node can run 150 pods. It should
# have items in the response.
fieldSelector: "spec.nodeName=node100dp5pod10k-49"
fieldSelector: "spec.nodeName=node100pod10k-49"
shares: 1000 # 1000 / (1000 + 100 + 200) * 10 = 7.7 req/s
- staleList:
version: v1
Expand All @@ -33,8 +28,5 @@ loadProfile:
# including kubelet, when they want to get pods from ETCD. The limit
# is 100 because it's close to MaxPods value.
limit: 100
# NOTE: Please align with ../../../cmd/runkperf/commands/bench/node100_dp5_pod10k.go.
fieldSelector: "spec.nodeName=node100dp5pod10k-49"
# And there are only 100 nodes and each node can run 150 pods. It should
# have items in the response.
seletor: "app=benchmark"
shares: 200 # 200 / (1000 + 100 + 200) * 10 = 1.5 req/s

This file was deleted.

Loading

0 comments on commit 7e101e6

Please sign in to comment.