Skip to content

Commit

Permalink
contrib/*: refactor node100_dp5_pod10k to node100_pod10k
Browse files Browse the repository at this point in the history
1. Support to show ready replica before benchmark
2. Get pod size in JSON format
3. Allow to change internal for restarting deployments

Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Apr 24, 2024
1 parent 9931b67 commit 8d6500b
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 164 deletions.
108 changes: 0 additions & 108 deletions contrib/cmd/runkperf/commands/bench/node100_dp5_pod10k.go

This file was deleted.

2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark
}
defer func() { _ = rgCfgFileDone() }()

vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100job1pod3k", 100, 110)
vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100job1pod3k", 100, 32, 96, 110)
if err != nil {
return nil, fmt.Errorf("failed to deploy virtual node: %w", err)
}
Expand Down
206 changes: 206 additions & 0 deletions contrib/cmd/runkperf/commands/bench/node100_pod10k.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package bench

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/internal/utils"

"github.com/urfave/cli"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

var benchNode100DeploymentNPod10KCase = cli.Command{
Name: "node100_pod10k",
Usage: `
The test suite is to setup 100 virtual nodes and deploy N deployments for 10k
pods on that nodes. It repeats to rolling-update deployments one by one during
benchmark.
`,
Flags: []cli.Flag{
cli.IntFlag{
Name: "deployments",
Usage: "The total number of deployments for 10k pods",
Value: 20,
},
cli.IntFlag{
Name: "total",
Usage: "Total requests per runner (There are 10 runners totally and runner's rate is 10)",
Value: 36000,
},
cli.IntFlag{
Name: "padding-bytes",
Usage: "Add <key=data, value=randomStringByLen(padding-bytes)> in pod's annotation to increase pod size",
Value: 0,
},
cli.DurationFlag{
Name: "interval",
Usage: "Interval to restart deployments",
Value: time.Second * 10,
},
},
Action: func(cliCtx *cli.Context) error {
_, err := renderBenchmarkReportInterceptor(
addAPIServerCoresInfoInterceptor(benchNode100DeploymentNPod10KRun),
)(cliCtx)
return err
},
}

// benchNode100DeploymentNPod10KCase is for subcommand benchNode100DeploymentNPod10KCase.
func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) {
ctx := context.Background()
kubeCfgPath := cliCtx.GlobalString("kubeconfig")

rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx,
"loadprofile/node100_pod10k.yaml")
if err != nil {
return nil, err
}
defer func() { _ = rgCfgFileDone() }()

// NOTE: The nodepool name should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml.
vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100pod10k", 100, 16, 64, 110)
if err != nil {
return nil, fmt.Errorf("failed to deploy virtual node: %w", err)
}
defer func() { _ = vcDone() }()

dpCtx, dpCancel := context.WithCancel(ctx)
defer dpCancel()

var wg sync.WaitGroup
wg.Add(1)

restartInterval := cliCtx.Duration("interval")
klog.V(0).Infof("The interval is %v for restaring deployments", restartInterval)

paddingBytes := cliCtx.Int("padding-bytes")
total := cliCtx.Int("deployments")
replica := 10000 / total

// NOTE: The name pattern should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml.
deploymentNamePattern := "benchmark"

rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx,
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval)
if err != nil {
dpCancel()
return nil, fmt.Errorf("failed to setup workload: %w", err)
}
defer ruCleanupFn()

err = dumpDeploymentReplicas(ctx, kubeCfgPath, deploymentNamePattern, total)
if err != nil {
return nil, err
}

podSize, err := getDeploymentPodSize(ctx, kubeCfgPath, deploymentNamePattern)
if err != nil {
return nil, err
}

podSize = (podSize / 1024) * 1024

go func() {
defer wg.Done()

// FIXME(weifu):
//
// DeployRunnerGroup should return ready notification.
// The rolling update should run after runners.
rollingUpdateFn()
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
cliCtx.GlobalString("kubeconfig"),
cliCtx.GlobalString("runner-image"),
rgCfgFile,
cliCtx.GlobalString("runner-flowcontrol"),
cliCtx.GlobalString("rg-affinity"),
)
dpCancel()
wg.Wait()

if derr != nil {
return nil, derr
}

return &internaltypes.BenchmarkReport{
Description: fmt.Sprintf(`
Environment: 100 virtual nodes managed by kwok-controller,
Workload: Deploy %d deployments with %d pods. Rolling-update deployments one by one and the interval is %v`,
total, total*replica, restartInterval),

LoadSpec: *rgSpec,
Result: *rgResult,
Info: map[string]interface{}{
"podSizeInBytes": podSize,
"interval": restartInterval.String(),
},
}, nil
}

// dumpDeploymentReplicas dumps deployment's replica.
func dumpDeploymentReplicas(ctx context.Context, kubeCfgPath string, namePattern string, total int) error {
klog.V(0).Info("Dump deployment's replica information")

cli, err := utils.BuildClientset(kubeCfgPath)
if err != nil {
return err
}

for i := 0; i < total; i++ {
name := fmt.Sprintf("%s-%d", namePattern, i)
ns := name

dp, err := cli.AppsV1().Deployments(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get deployment %s in namespace %s: %w",
name, ns, err)
}

klog.V(0).InfoS("Deployment", "name", name, "ns", ns,
"replica", *dp.Spec.Replicas, "readyReplicas", dp.Status.ReadyReplicas)
}
return nil
}

// getDeploymentPodSize gets the size of pod created by deployment.
func getDeploymentPodSize(ctx context.Context, kubeCfgPath string, namePattern string) (int, error) {
ns := fmt.Sprintf("%s-0", namePattern)
labelSelector := fmt.Sprintf("app=%s", namePattern)

klog.V(0).InfoS("Get the size of pod", "labelSelector", labelSelector, "namespace", ns)

cli, err := utils.BuildClientset(kubeCfgPath)
if err != nil {
return 0, err
}

resp, err := cli.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
Limit: 1,
})
if err != nil {
return 0, fmt.Errorf("failed to list pods with labelSelector %s: %w",
labelSelector, err)
}
if len(resp.Items) == 0 {
return 0, fmt.Errorf("no pod with labelSelector %s in namespace %s: %w",
labelSelector, ns, err)
}

pod := resp.Items[0]
data, err := json.Marshal(pod)
if err != nil {
return 0, fmt.Errorf("failed to json.Marshal pod: %w", err)
}
return len(data), nil
}
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ var Command = cli.Command{
},
Subcommands: []cli.Command{
benchNode100Job1Pod3KCase,
benchNode100Deployment5Pod10KCase,
benchNode100DeploymentNPod10KCase,
},
}
4 changes: 2 additions & 2 deletions contrib/cmd/runkperf/commands/bench/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func renderBenchmarkReportInterceptor(handler subcmdActionFunc) subcmdActionFunc
}

// deployVirtualNodepool deploys virtual nodepool.
func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target string, nodes, maxPods int) (func() error, error) {
func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target string, nodes, cpu, memory, maxPods int) (func() error, error) {
klog.V(0).InfoS("Deploying virtual nodepool", "name", target)

kubeCfgPath := cliCtx.GlobalString("kubeconfig")
Expand All @@ -114,7 +114,7 @@ func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target stri
klog.V(0).ErrorS(err, "failed to delete nodepool", "name", target)
}

err = kr.NewNodepool(ctx, 0, target, nodes, maxPods, virtualNodeAffinity, sharedProviderID)
err = kr.NewNodepool(ctx, 0, target, nodes, cpu, memory, maxPods, virtualNodeAffinity, sharedProviderID)
if err != nil {
return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err)
}
Expand Down
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/ekswarmup/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string) (func(
klog.V(0).ErrorS(err, "failed to delete", "nodepool", target)
}

err = kr.NewNodepool(ctx, 0, target, 100, 110,
err = kr.NewNodepool(ctx, 0, target, 100, 32, 96, 110,
"node.kubernetes.io/instance-type=m4.2xlarge", sharedProviderID)
if err != nil {
return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
count: 10
loadProfile:
version: 1
description: "node100-deployment5-pod10k"
description: "node100-pod10k"
spec:
rate: 10
total: 36000
Expand All @@ -14,12 +14,7 @@ loadProfile:
- staleList:
version: v1
resource: pods
# NOTE: Please align with ../../utils/utils.go#RepeatRollingUpdate10KPod
seletor: "app=benchmark"
# NOTE: Please align with ../../../cmd/runkperf/commands/bench/node100_dp5_pod10k.go.
# And there are only 100 nodes and each node can run 150 pods. It should
# have items in the response.
fieldSelector: "spec.nodeName=node100dp5pod10k-49"
fieldSelector: "spec.nodeName=node100pod10k-49"
shares: 1000 # 1000 / (1000 + 100 + 200) * 10 = 7.7 req/s
- staleList:
version: v1
Expand All @@ -33,8 +28,5 @@ loadProfile:
# including kubelet, when they want to get pods from ETCD. The limit
# is 100 because it's close to MaxPods value.
limit: 100
# NOTE: Please align with ../../../cmd/runkperf/commands/bench/node100_dp5_pod10k.go.
fieldSelector: "spec.nodeName=node100dp5pod10k-49"
# And there are only 100 nodes and each node can run 150 pods. It should
# have items in the response.
seletor: "app=benchmark"
shares: 200 # 200 / (1000 + 100 + 200) * 10 = 1.5 req/s

This file was deleted.

Loading

0 comments on commit 8d6500b

Please sign in to comment.