From 3147c73d91cfc7cb4a8fa1506f245efe718e08fd Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Tue, 30 Jan 2024 09:12:53 +0000 Subject: [PATCH] *: introduce --runner-flowcontrol to rg run * Introduce runCmdCfg and option pattern to setup server helm release. * Introduce flowcontrol to runner group. Signed-off-by: Wei Fu --- cmd/kperf/commands/runnergroup/run.go | 31 ++++++- .../server/templates/flowcontrol.yaml | 27 ++++++ manifests/runnergroup/server/values.yaml | 3 + runner/runnergroup_run.go | 88 +++++++++++++++---- 4 files changed, 132 insertions(+), 17 deletions(-) create mode 100644 manifests/runnergroup/server/templates/flowcontrol.yaml diff --git a/cmd/kperf/commands/runnergroup/run.go b/cmd/kperf/commands/runnergroup/run.go index 5fc3f51..cab682b 100644 --- a/cmd/kperf/commands/runnergroup/run.go +++ b/cmd/kperf/commands/runnergroup/run.go @@ -3,6 +3,8 @@ package runnergroup import ( "context" "fmt" + "strconv" + "strings" "github.com/Azure/kperf/api/types" "github.com/Azure/kperf/cmd/kperf/commands/utils" @@ -32,6 +34,11 @@ var runCommand = cli.Command{ // Right now, we need to set image manually. Required: true, }, + cli.StringFlag{ + Name: "runner-flowcontrol", + Usage: "Apply flowcontrol to runner group. (FORMAT: PriorityLevel:MatchingPrecedence)", + Value: "workload-low:1000", + }, cli.StringSliceFlag{ Name: "affinity", Usage: "Deploy server to the node with a specific labels (FORMAT: KEY=VALUE[,VALUE])", @@ -48,6 +55,11 @@ var runCommand = cli.Command{ return fmt.Errorf("failed to parse affinity: %w", err) } + priorityLevel, matchingPrecedence, err := parseFlowControl(cliCtx.String("runner-flowcontrol")) + if err != nil { + return fmt.Errorf("failed to parse runner-flowcontrol: %w", err) + } + specs, err := loadRunnerGroupSpec(cliCtx) if err != nil { return fmt.Errorf("failed to load runner group spec: %w", err) @@ -61,7 +73,8 @@ var runCommand = cli.Command{ kubeCfgPath, imgRef, specs[0], - affinityLabels, + runner.WithRunCmdServerNodeSelectorsOpt(affinityLabels), + runner.WithRunCmdRunnerGroupFlowControl(priorityLevel, matchingPrecedence), ) }, } @@ -86,3 +99,19 @@ func loadRunnerGroupSpec(cliCtx *cli.Context) ([]*types.RunnerGroupSpec, error) } return specs, nil } + +// parseFlowControl parses PriorityLevel:MatchingPrecedence into string and int. +func parseFlowControl(value string) (priorityLevel string, matchingPrecedence int, err error) { + l, r, ok := strings.Cut(value, ":") + if !ok || len(l) == 0 || len(r) == 0 { + err = fmt.Errorf("expected PriorityLevel:MatchingPrecedence format, but got %s", value) + return + } + + priorityLevel = l + matchingPrecedence, err = strconv.Atoi(r) + if err != nil { + err = fmt.Errorf("failed to parse matchingPrecedence into int: %w", err) + } + return +} diff --git a/manifests/runnergroup/server/templates/flowcontrol.yaml b/manifests/runnergroup/server/templates/flowcontrol.yaml new file mode 100644 index 0000000..9f0842c --- /dev/null +++ b/manifests/runnergroup/server/templates/flowcontrol.yaml @@ -0,0 +1,27 @@ +apiVersion: flowcontrol.apiserver.k8s.io/v1beta3 +kind: FlowSchema +metadata: + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} +spec: + distinguisherMethod: + type: ByUser + matchingPrecedence: {{ .Values.flowcontrol.matchingPrecedence }} + priorityLevelConfiguration: + name: {{ .Values.flowcontrol.priorityLevelConfiguration }} + rules: + - resourceRules: + - apiGroups: + - '*' + clusterScope: true + namespaces: + - '*' + resources: + - '*' + verbs: + - '*' + subjects: + - kind: ServiceAccount + serviceAccount: + name: {{ .Values.name }} + namespace: {{ .Release.Namespace }} diff --git a/manifests/runnergroup/server/values.yaml b/manifests/runnergroup/server/values.yaml index fd028bb..a1e3809 100644 --- a/manifests/runnergroup/server/values.yaml +++ b/manifests/runnergroup/server/values.yaml @@ -3,3 +3,6 @@ image: "" # TODO(weifu): need https://github.com/Azure/kperf/issues/25 to support list runnerGroupSpec: "" nodeSelectors: {} +flowcontrol: + priorityLevelConfiguration: workload-low + matchingPrecedence: 1000 diff --git a/runner/runnergroup_run.go b/runner/runnergroup_run.go index 12c6db4..b3e6cec 100644 --- a/runner/runnergroup_run.go +++ b/runner/runnergroup_run.go @@ -9,7 +9,19 @@ import ( "github.com/Azure/kperf/helmcli" "github.com/Azure/kperf/manifests" - "gopkg.in/yaml.v2" + "gopkg.in/yaml.v3" +) + +var ( + defaultRunCmdCfg = runCmdConfig{ + runnerGroupFlowcontrol: struct { + priorityLevel string + matchingPrecedence int + }{ + priorityLevel: "workload-low", + matchingPrecedence: 1000, + }, + } ) // CreateRunnerGroupServer creates a long running server to deploy runner groups. @@ -21,21 +33,21 @@ func CreateRunnerGroupServer(ctx context.Context, kubeconfigPath string, runnerImage string, rgSpec *types.RunnerGroupSpec, - nodeSelectors map[string][]string, + opts ...RunCmdOpt, ) error { specInStr, err := tweakAndMarshalSpec(rgSpec) if err != nil { return err } - nodeSelectorsInYAML, err := renderNodeSelectors(nodeSelectors) - if err != nil { - return err + cfg := defaultRunCmdCfg + for _, opt := range opts { + opt(&cfg) } - nodeSelectorsAppiler, err := helmcli.YAMLValuesApplier(nodeSelectorsInYAML) + appiler, err := cfg.toServerHelmValuesAppiler() if err != nil { - return fmt.Errorf("failed to prepare YAML value applier for nodeSelectors: %w", err) + return err } getCli, err := helmcli.NewGetCli(kubeconfigPath, runnerGroupReleaseNamespace) @@ -64,7 +76,7 @@ func CreateRunnerGroupServer(ctx context.Context, "image="+runnerImage, "runnerGroupSpec="+specInStr, ), - nodeSelectorsAppiler, + appiler, ) if err != nil { return fmt.Errorf("failed to create helm release client: %w", err) @@ -88,16 +100,60 @@ func tweakAndMarshalSpec(spec *types.RunnerGroupSpec) (string, error) { return string(data), nil } -// renderNodeSelectors renders labels into YAML string. -func renderNodeSelectors(labels map[string][]string) (string, error) { - // NOTE: It should be aligned with ../manifests/runnergroup/server/values.yaml. - target := map[string]interface{}{ - "nodeSelectors": labels, +type runCmdConfig struct { + // serverNodeSelectors forces to schedule server to nodes with that specific labels. + serverNodeSelectors map[string][]string + // runnerGroupFlowcontrol applies flowcontrol settings to runners. + // + // NOTE: Please align with ../manifests/runnergroup/server/values.yaml + // + // FIXME(weifu): before v1.0.0, we should define type in ../manifests. + runnerGroupFlowcontrol struct { + priorityLevel string + matchingPrecedence int + } + + // TODO(weifu): merge name/image/specs into this +} + +// RunCmdOpt is used to update default run command's setting. +type RunCmdOpt func(*runCmdConfig) + +// WithRunCmdServerNodeSelectorsOpt updates server's node selectors. +func WithRunCmdServerNodeSelectorsOpt(labels map[string][]string) RunCmdOpt { + return func(cfg *runCmdConfig) { + cfg.serverNodeSelectors = labels + } +} + +// WithRunCmdRunnerGroupFlowControl updates runner groups' flowcontrol. +func WithRunCmdRunnerGroupFlowControl(priorityLevel string, matchingPrecedence int) RunCmdOpt { + return func(cfg *runCmdConfig) { + cfg.runnerGroupFlowcontrol.priorityLevel = priorityLevel + cfg.runnerGroupFlowcontrol.matchingPrecedence = matchingPrecedence + } +} + +// toServerHelmValuesAppiler creates ValuesApplier. +// +// NOTE: It should be aligned with ../manifests/runnergroup/server/values.yaml. +func (cfg *runCmdConfig) toServerHelmValuesAppiler() (helmcli.ValuesApplier, error) { + values := map[string]interface{}{ + "nodeSelectors": cfg.serverNodeSelectors, + "flowcontrol": map[string]interface{}{ + "priorityLevelConfiguration": cfg.runnerGroupFlowcontrol.priorityLevel, + "matchingPrecedence": cfg.runnerGroupFlowcontrol.matchingPrecedence, + }, + } + + rawData, err := yaml.Marshal(values) + if err != nil { + return nil, fmt.Errorf("failed to render run command config into YAML: %w", err) } - rawData, err := yaml.Marshal(target) + appiler, err := helmcli.YAMLValuesApplier(string(rawData)) if err != nil { - return "", fmt.Errorf("failed to render nodeSelectors: %w", err) + return nil, fmt.Errorf("failed to prepare value appiler for run command config: %w", err) } - return string(rawData), nil + return appiler, nil }