Skip to content

Commit

Permalink
Add Flyin propeller config (#4610)
Browse files Browse the repository at this point in the history
* Revert "Detect subNode phase updates to reduce evaluation frequency of ArrayNode (#4535)"

This reverts commit b50ba87.

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add flyin pflags

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add taskTemplate as parameter to GetLogsForContainerInPod

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add flyin template scheme and unit tests

Signed-off-by: Eduardo Apolinario <[email protected]>

* Revert unintended change.

Signed-off-by: Eduardo Apolinario <[email protected]>

* Lint

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix bad refactor due to lint warning

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove TODOs

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario and eapolinario authored Dec 20, 2023
1 parent 3648440 commit 3501675
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 20 deletions.
3 changes: 3 additions & 0 deletions flyteplugins/go/tasks/logs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type LogConfig struct {
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

IsFlyinEnabled bool `json:"flyin-enabled" pflag:",Enable Log-links to flyin logs"`
FlyinTemplateURI tasklog.TemplateURI `json:"flyin-template-uri" pflag:",Template Uri to use when building flyin log links"`

Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"`
}

Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go/tasks/logs/logconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions flyteplugins/go/tasks/logs/logconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
Expand Down Expand Up @@ -51,6 +51,7 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas
PodUnixFinishTime: finishTime,
TaskExecutionID: taskExecID,
ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme,
TaskTemplate: taskTemplate,
},
)

Expand Down Expand Up @@ -108,6 +109,14 @@ func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
}
}

if cfg.IsFlyinEnabled {
if len(cfg.FlyinTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{cfg.FlyinTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://flyin.%s/logs/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", cfg.GCPProjectName)}, MessageFormat: core.TaskLog_JSON})
}
}

plugins = append(plugins, cfg.Templates...)
return templateLogPluginCollection{plugins: plugins}, nil
}
54 changes: 41 additions & 13 deletions flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func dummyTaskExecID() pluginCore.TaskExecutionID {
func TestGetLogsForContainerInPod_NoPlugins(t *testing.T) {
logPlugin, err := InitializeLogPlugins(&LogConfig{})
assert.NoError(t, err)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil, nil)
assert.NoError(t, err)
assert.Nil(t, l)
}
Expand All @@ -56,7 +56,7 @@ func TestGetLogsForContainerInPod_NoLogs(t *testing.T) {
CloudwatchLogGroup: "/kubernetes/flyte-production",
})
assert.NoError(t, err)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil, nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestGetLogsForContainerInPod_BadIndex(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil, nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand All @@ -112,7 +112,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil, nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestGetLogsForContainerInPod_Cloudwatch(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestGetLogsForContainerInPod_K8s(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestGetLogsForContainerInPod_All(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil)
assert.Nil(t, err)
assert.Len(t, logs, 2)
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestGetLogsForContainerInPod_Stackdriver(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand All @@ -252,7 +252,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) {

IsStackDriverEnabled: true,
StackDriverTemplateURI: "https://sd-my-log-server/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
}, []*core.TaskLog{
}, nil, []*core.TaskLog{
{
Uri: "https://k8s-my-log-server/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Expand All @@ -275,7 +275,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) {
assertTestSucceeded(t, &LogConfig{
IsStackDriverEnabled: true,
StackDriverTemplateURI: "https://sd-my-log-server/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
}, []*core.TaskLog{
}, nil, []*core.TaskLog{
{
Uri: "https://sd-my-log-server/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Expand All @@ -285,7 +285,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) {
})
}

func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*core.TaskLog) {
func assertTestSucceeded(tb testing.TB, config *LogConfig, taskTemplate *core.TaskTemplate, expectedTaskLogs []*core.TaskLog) {
logPlugin, err := InitializeLogPlugins(config)
assert.NoError(tb, err)

Expand All @@ -310,7 +310,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c
},
}

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil, taskTemplate)
assert.Nil(tb, err)
assert.Len(tb, logs, len(expectedTaskLogs))
if diff := deep.Equal(logs, expectedTaskLogs); len(diff) > 0 {
Expand All @@ -337,7 +337,7 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
Scheme: tasklog.TemplateSchemeTaskExecution,
},
},
}, []*core.TaskLog{
}, nil, []*core.TaskLog{
{
Uri: "https://my-log-server/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Expand All @@ -350,3 +350,31 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
},
})
}

func TestGetLogsForContainerInPod_Flyin(t *testing.T) {
assertTestSucceeded(t,
&LogConfig{
IsKubernetesEnabled: true,
KubernetesTemplateURI: "https://k8s.com",
IsFlyinEnabled: true,
FlyinTemplateURI: "https://flyin.mydomain.com:{{ .port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
"port": "65535",
},
},
[]*core.TaskLog{
{
Uri: "https://k8s.com",
MessageFormat: core.TaskLog_JSON,
Name: "Kubernetes Logs my-Suffix",
},
{
Uri: "https://flyin.mydomain.com:65535/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Name: "Flyin Logs my-Suffix",
},
})
}
3 changes: 3 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type TemplateScheme int
const (
TemplateSchemePod TemplateScheme = iota
TemplateSchemeTaskExecution
TemplateSchemeFlyin
)

// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
Expand All @@ -30,6 +31,7 @@ type TemplateVarsByScheme struct {
Common TemplateVars
Pod TemplateVars
TaskExecution TemplateVars
Flyin TemplateVars
}

// Input contains all available information about task's execution that a log plugin can use to construct task's
Expand All @@ -48,6 +50,7 @@ type Input struct {
PodUID string
TaskExecutionID pluginsCore.TaskExecutionID
ExtraTemplateVarsByScheme *TemplateVarsByScheme
TaskTemplate *core.TaskTemplate
}

// Output contains all task logs a plugin generates for a given Input.
Expand Down
27 changes: 27 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type templateRegexes struct {
ExecutionProject *regexp.Regexp
ExecutionDomain *regexp.Regexp
GeneratedName *regexp.Regexp
Port *regexp.Regexp
}

func initDefaultRegexes() templateRegexes {
Expand All @@ -60,6 +61,7 @@ func initDefaultRegexes() templateRegexes {
MustCreateRegex("executionProject"),
MustCreateRegex("executionDomain"),
MustCreateRegex("generatedName"),
MustCreateRegex("port"),
}
}

Expand All @@ -85,6 +87,16 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
}

switch scheme {
case TemplateSchemeFlyin:
port := input.TaskTemplate.GetConfig()["port"]
if port == "" {
port = "8080"
}
vars = append(
vars,
TemplateVar{defaultRegexes.Port, port},
)
fallthrough
case TemplateSchemePod:
// Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log
// stream. Therefore, we must also strip the prefix.
Expand Down Expand Up @@ -181,7 +193,22 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
templateVars := input.templateVarsForScheme(p.Scheme)
taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs))

// Grab metadata from task template and check if key "link_type" is set to "vscode".
// If so, add a vscode link to the task logs.
isFlyin := false
if input.TaskTemplate != nil && input.TaskTemplate.GetConfig() != nil {
config := input.TaskTemplate.GetConfig()
if config != nil && config["link_type"] == "vscode" {
isFlyin = true
}
}
for _, templateURI := range p.TemplateURIs {
// Skip Flyin logs if plugin is enabled but no metadata is defined in input's task template.
// This is to prevent Flyin logs from being generated for tasks that don't have a Flyin metadata section.
if p.DisplayName == "Flyin Logs" && !isFlyin {
continue
}
taskLogs = append(taskLogs, &core.TaskLog{
Uri: replaceAll(templateURI, templateVars),
Name: p.DisplayName + input.LogName,
Expand Down
Loading

0 comments on commit 3501675

Please sign in to comment.