Skip to content

*: introduce --runner-flowcontrol to rg run #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion cmd/kperf/commands/runnergroup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package runnergroup
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/cmd/kperf/commands/utils"
Expand Down Expand Up @@ -32,6 +34,11 @@ var runCommand = cli.Command{
// Right now, we need to set image manually.
Required: true,
},
cli.StringFlag{
Name: "runner-flowcontrol",
Usage: "Apply flowcontrol to runner group. (FORMAT: PriorityLevel:MatchingPrecedence)",
Value: "workload-low:1000",
},
cli.StringSliceFlag{
Name: "affinity",
Usage: "Deploy server to the node with a specific labels (FORMAT: KEY=VALUE[,VALUE])",
Expand All @@ -48,6 +55,11 @@ var runCommand = cli.Command{
return fmt.Errorf("failed to parse affinity: %w", err)
}

priorityLevel, matchingPrecedence, err := parseFlowControl(cliCtx.String("runner-flowcontrol"))
if err != nil {
return fmt.Errorf("failed to parse runner-flowcontrol: %w", err)
}

specs, err := loadRunnerGroupSpec(cliCtx)
if err != nil {
return fmt.Errorf("failed to load runner group spec: %w", err)
Expand All @@ -61,7 +73,8 @@ var runCommand = cli.Command{
kubeCfgPath,
imgRef,
specs[0],
affinityLabels,
runner.WithRunCmdServerNodeSelectorsOpt(affinityLabels),
runner.WithRunCmdRunnerGroupFlowControl(priorityLevel, matchingPrecedence),
)
},
}
Expand All @@ -86,3 +99,19 @@ func loadRunnerGroupSpec(cliCtx *cli.Context) ([]*types.RunnerGroupSpec, error)
}
return specs, nil
}

// parseFlowControl parses PriorityLevel:MatchingPrecedence into string and int.
func parseFlowControl(value string) (priorityLevel string, matchingPrecedence int, err error) {
l, r, ok := strings.Cut(value, ":")
if !ok || len(l) == 0 || len(r) == 0 {
err = fmt.Errorf("expected PriorityLevel:MatchingPrecedence format, but got %s", value)
return
}

priorityLevel = l
matchingPrecedence, err = strconv.Atoi(r)
if err != nil {
err = fmt.Errorf("failed to parse matchingPrecedence into int: %w", err)
}
return
}
27 changes: 27 additions & 0 deletions manifests/runnergroup/server/templates/flowcontrol.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: flowcontrol.apiserver.k8s.io/v1beta3
kind: FlowSchema
metadata:
name: {{ .Values.name }}
namespace: {{ .Release.Namespace }}
spec:
distinguisherMethod:
type: ByUser
matchingPrecedence: {{ .Values.flowcontrol.matchingPrecedence }}
priorityLevelConfiguration:
name: {{ .Values.flowcontrol.priorityLevelConfiguration }}
rules:
- resourceRules:
- apiGroups:
- '*'
clusterScope: true
namespaces:
- '*'
resources:
- '*'
verbs:
- '*'
subjects:
- kind: ServiceAccount
serviceAccount:
name: {{ .Values.name }}
namespace: {{ .Release.Namespace }}
3 changes: 3 additions & 0 deletions manifests/runnergroup/server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ image: ""
# TODO(weifu): need https://github.com/Azure/kperf/issues/25 to support list
runnerGroupSpec: ""
nodeSelectors: {}
flowcontrol:
priorityLevelConfiguration: workload-low
matchingPrecedence: 1000
88 changes: 72 additions & 16 deletions runner/runnergroup_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,19 @@ import (
"github.com/Azure/kperf/helmcli"
"github.com/Azure/kperf/manifests"

"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)

var (
defaultRunCmdCfg = runCmdConfig{
runnerGroupFlowcontrol: struct {
priorityLevel string
matchingPrecedence int
}{
priorityLevel: "workload-low",
matchingPrecedence: 1000,
},
}
)

// CreateRunnerGroupServer creates a long running server to deploy runner groups.
Expand All @@ -21,21 +33,21 @@ func CreateRunnerGroupServer(ctx context.Context,
kubeconfigPath string,
runnerImage string,
rgSpec *types.RunnerGroupSpec,
nodeSelectors map[string][]string,
opts ...RunCmdOpt,
) error {
specInStr, err := tweakAndMarshalSpec(rgSpec)
if err != nil {
return err
}

nodeSelectorsInYAML, err := renderNodeSelectors(nodeSelectors)
if err != nil {
return err
cfg := defaultRunCmdCfg
for _, opt := range opts {
opt(&cfg)
}

nodeSelectorsAppiler, err := helmcli.YAMLValuesApplier(nodeSelectorsInYAML)
appiler, err := cfg.toServerHelmValuesAppiler()
if err != nil {
return fmt.Errorf("failed to prepare YAML value applier for nodeSelectors: %w", err)
return err
}

getCli, err := helmcli.NewGetCli(kubeconfigPath, runnerGroupReleaseNamespace)
Expand Down Expand Up @@ -64,7 +76,7 @@ func CreateRunnerGroupServer(ctx context.Context,
"image="+runnerImage,
"runnerGroupSpec="+specInStr,
),
nodeSelectorsAppiler,
appiler,
)
if err != nil {
return fmt.Errorf("failed to create helm release client: %w", err)
Expand All @@ -88,16 +100,60 @@ func tweakAndMarshalSpec(spec *types.RunnerGroupSpec) (string, error) {
return string(data), nil
}

// renderNodeSelectors renders labels into YAML string.
func renderNodeSelectors(labels map[string][]string) (string, error) {
// NOTE: It should be aligned with ../manifests/runnergroup/server/values.yaml.
target := map[string]interface{}{
"nodeSelectors": labels,
type runCmdConfig struct {
// serverNodeSelectors forces to schedule server to nodes with that specific labels.
serverNodeSelectors map[string][]string
// runnerGroupFlowcontrol applies flowcontrol settings to runners.
//
// NOTE: Please align with ../manifests/runnergroup/server/values.yaml
//
// FIXME(weifu): before v1.0.0, we should define type in ../manifests.
runnerGroupFlowcontrol struct {
priorityLevel string
matchingPrecedence int
}

// TODO(weifu): merge name/image/specs into this
}

// RunCmdOpt is used to update default run command's setting.
type RunCmdOpt func(*runCmdConfig)

// WithRunCmdServerNodeSelectorsOpt updates server's node selectors.
func WithRunCmdServerNodeSelectorsOpt(labels map[string][]string) RunCmdOpt {
return func(cfg *runCmdConfig) {
cfg.serverNodeSelectors = labels
}
}

// WithRunCmdRunnerGroupFlowControl updates runner groups' flowcontrol.
func WithRunCmdRunnerGroupFlowControl(priorityLevel string, matchingPrecedence int) RunCmdOpt {
return func(cfg *runCmdConfig) {
cfg.runnerGroupFlowcontrol.priorityLevel = priorityLevel
cfg.runnerGroupFlowcontrol.matchingPrecedence = matchingPrecedence
}
}

// toServerHelmValuesAppiler creates ValuesApplier.
//
// NOTE: It should be aligned with ../manifests/runnergroup/server/values.yaml.
func (cfg *runCmdConfig) toServerHelmValuesAppiler() (helmcli.ValuesApplier, error) {
values := map[string]interface{}{
"nodeSelectors": cfg.serverNodeSelectors,
"flowcontrol": map[string]interface{}{
"priorityLevelConfiguration": cfg.runnerGroupFlowcontrol.priorityLevel,
"matchingPrecedence": cfg.runnerGroupFlowcontrol.matchingPrecedence,
},
}

rawData, err := yaml.Marshal(values)
if err != nil {
return nil, fmt.Errorf("failed to render run command config into YAML: %w", err)
}

rawData, err := yaml.Marshal(target)
appiler, err := helmcli.YAMLValuesApplier(string(rawData))
if err != nil {
return "", fmt.Errorf("failed to render nodeSelectors: %w", err)
return nil, fmt.Errorf("failed to prepare value appiler for run command config: %w", err)
}
return string(rawData), nil
return appiler, nil
}