From c6c30511b615b285be9f5a5b002af0579415797f Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Tue, 12 Dec 2023 17:42:14 -0800 Subject: [PATCH 1/8] Revert "Detect subNode phase updates to reduce evaluation frequency of ArrayNode (#4535)" This reverts commit b50ba877e4632826dd4d5bc0978a41d39c8d172a. Signed-off-by: Eduardo Apolinario --- .../pkg/controller/nodes/array/handler.go | 20 ++++++++----------- .../pkg/controller/nodes/executor.go | 2 +- flytestdlib/storage/cached_rawstore.go | 3 --- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 00a9fc747e..dddcd0e7c5 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -169,7 +169,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState() currentArrayNodePhase := arrayNodeState.Phase - incrementTaskPhaseVersion := false + taskPhaseVersion := arrayNodeState.TaskPhaseVersion eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder()) switch currentArrayNodePhase { @@ -246,7 +246,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu messageCollector := errorcollector.NewErrorMessageCollector() for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) - taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) // do not process nodes in terminal state if isTerminalNodePhase(nodePhase) { @@ -284,11 +283,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } arrayNodeState.SubNodeRetryAttempts.SetItem(i, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(i, uint64(subNodeStatus.GetSystemFailures())) - - // increment task phase version if subNode phase or task phase changed - if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase { - incrementTaskPhaseVersion = true - } } // process phases of subNodes to determine overall `ArrayNode` phase @@ -435,15 +429,17 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu taskPhase = idlcore.TaskExecution_FAILED } - // if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise - // increment it if we detect any changes in subNode state. + // need to increment taskPhaseVersion if arrayNodeState.Phase does not change, otherwise + // reset to 0. by incrementing this always we report an event and ensure processing + // every time the ArrayNode is evaluated. if this overhead becomes too large, we will need + // to revisit and only increment when any subNode state changes. if currentArrayNodePhase != arrayNodeState.Phase { arrayNodeState.TaskPhaseVersion = 0 - } else if incrementTaskPhaseVersion { - arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1 + } else { + arrayNodeState.TaskPhaseVersion = taskPhaseVersion + 1 } - if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil { + if err := eventRecorder.finalize(ctx, nCtx, taskPhase, taskPhaseVersion, a.eventConfig); err != nil { logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) return handler.UnknownTransition, err } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 23062a8cb3..8e96ee9645 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -753,7 +753,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "BindingResolutionFailure", err.Error(), nil), nil } - if nodeInputs != nil && len(nodeInputs.Literals) > 0 { + if nodeInputs != nil { inputsFile := v1alpha1.GetInputsFile(dataDir) if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil { c.metrics.InputsWriteFailure.Inc(ctx) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index 913a517a0f..a37a4cdf6b 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -35,9 +35,6 @@ type cachedRawStore struct { // Head gets metadata about the reference. This should generally be a lightweight operation. func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) { - ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head") - defer span.End() - key := []byte(reference) if oRaw, err := s.cache.Get(key); err == nil { s.metrics.CacheHit.Inc() From 68d4b34b8250ad09d5d3fb3c063511d0aa3bf833 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Dec 2023 12:19:31 -0300 Subject: [PATCH 2/8] Add flyin pflags Signed-off-by: Eduardo Apolinario --- flyteplugins/go/tasks/logs/config.go | 3 ++ flyteplugins/go/tasks/logs/logconfig_flags.go | 2 ++ .../go/tasks/logs/logconfig_flags_test.go | 28 +++++++++++++++++++ 3 files changed, 33 insertions(+) diff --git a/flyteplugins/go/tasks/logs/config.go b/flyteplugins/go/tasks/logs/config.go index ca5a6012a8..b802844a4a 100644 --- a/flyteplugins/go/tasks/logs/config.go +++ b/flyteplugins/go/tasks/logs/config.go @@ -28,6 +28,9 @@ type LogConfig struct { StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` + IsFlyinEnabled bool `json:"flyin-enabled" pflag:",Enable Log-links to flyin logs"` + FlyinTemplateURI tasklog.TemplateURI `json:"flyin-template-uri" pflag:",Template Uri to use when building flyin log links"` + Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"` } diff --git a/flyteplugins/go/tasks/logs/logconfig_flags.go b/flyteplugins/go/tasks/logs/logconfig_flags.go index 00c08a8a58..de8ba022dc 100755 --- a/flyteplugins/go/tasks/logs/logconfig_flags.go +++ b/flyteplugins/go/tasks/logs/logconfig_flags.go @@ -61,5 +61,7 @@ func (cfg LogConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "gcp-project"), DefaultConfig.GCPProjectName, "Name of the project in GCP") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "stackdriver-logresourcename"), DefaultConfig.StackdriverLogResourceName, "Name of the logresource in stackdriver") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "stackdriver-template-uri"), DefaultConfig.StackDriverTemplateURI, "Template Uri to use when building stackdriver log links") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "flyin-enabled"), DefaultConfig.IsFlyinEnabled, "Enable Log-links to flyin logs") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "flyin-template-uri"), DefaultConfig.FlyinTemplateURI, "Template Uri to use when building flyin log links") return cmdFlags } diff --git a/flyteplugins/go/tasks/logs/logconfig_flags_test.go b/flyteplugins/go/tasks/logs/logconfig_flags_test.go index 8bb775df1f..dfbee43c69 100755 --- a/flyteplugins/go/tasks/logs/logconfig_flags_test.go +++ b/flyteplugins/go/tasks/logs/logconfig_flags_test.go @@ -253,4 +253,32 @@ func TestLogConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_flyin-enabled", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("flyin-enabled", testValue) + if vBool, err := cmdFlags.GetBool("flyin-enabled"); err == nil { + testDecodeJson_LogConfig(t, fmt.Sprintf("%v", vBool), &actual.IsFlyinEnabled) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_flyin-template-uri", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("flyin-template-uri", testValue) + if vString, err := cmdFlags.GetString("flyin-template-uri"); err == nil { + testDecodeJson_LogConfig(t, fmt.Sprintf("%v", vString), &actual.FlyinTemplateURI) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } From 2ea35cd762b81889e638e1471696b40383c10232 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Dec 2023 12:20:25 -0300 Subject: [PATCH 3/8] Add taskTemplate as parameter to GetLogsForContainerInPod Signed-off-by: Eduardo Apolinario --- flyteplugins/go/tasks/logs/logging_utils.go | 12 ++++- .../go/tasks/logs/logging_utils_test.go | 54 ++++++++++++++----- .../go/tasks/plugins/k8s/pod/plugin.go | 7 ++- 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 4978109458..a82963a652 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -14,7 +14,7 @@ import ( ) // Internal -func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) { +func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) { if logPlugin == nil { return nil, nil } @@ -51,6 +51,7 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas PodUnixFinishTime: finishTime, TaskExecutionID: taskExecID, ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme, + TaskTemplate: taskTemplate, }, ) @@ -108,6 +109,15 @@ func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) { } } + if cfg.IsFlyinEnabled { + if len(cfg.FlyinTemplateURI) > 0 { + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{cfg.FlyinTemplateURI}, MessageFormat: core.TaskLog_JSON}) + } else { + // TODO: Figure out what to use a default here. + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://flyin.%s/logs/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", cfg.GCPProjectName)}, MessageFormat: core.TaskLog_JSON}) + } + } + plugins = append(plugins, cfg.Templates...) return templateLogPluginCollection{plugins: plugins}, nil } diff --git a/flyteplugins/go/tasks/logs/logging_utils_test.go b/flyteplugins/go/tasks/logs/logging_utils_test.go index 91048eff16..46eb682201 100644 --- a/flyteplugins/go/tasks/logs/logging_utils_test.go +++ b/flyteplugins/go/tasks/logs/logging_utils_test.go @@ -44,7 +44,7 @@ func dummyTaskExecID() pluginCore.TaskExecutionID { func TestGetLogsForContainerInPod_NoPlugins(t *testing.T) { logPlugin, err := InitializeLogPlugins(&LogConfig{}) assert.NoError(t, err) - l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil) + l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil, nil) assert.NoError(t, err) assert.Nil(t, l) } @@ -56,7 +56,7 @@ func TestGetLogsForContainerInPod_NoLogs(t *testing.T) { CloudwatchLogGroup: "/kubernetes/flyte-production", }) assert.NoError(t, err) - p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil) + p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil, nil) assert.NoError(t, err) assert.Nil(t, p) } @@ -87,7 +87,7 @@ func TestGetLogsForContainerInPod_BadIndex(t *testing.T) { } pod.Name = podName - p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil) + p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil, nil) assert.NoError(t, err) assert.Nil(t, p) } @@ -112,7 +112,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) { } pod.Name = podName - p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil) + p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil, nil) assert.NoError(t, err) assert.Nil(t, p) } @@ -142,7 +142,7 @@ func TestGetLogsForContainerInPod_Cloudwatch(t *testing.T) { } pod.Name = podName - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil) assert.Nil(t, err) assert.Len(t, logs, 1) } @@ -172,7 +172,7 @@ func TestGetLogsForContainerInPod_K8s(t *testing.T) { } pod.Name = podName - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil) assert.Nil(t, err) assert.Len(t, logs, 1) } @@ -205,7 +205,7 @@ func TestGetLogsForContainerInPod_All(t *testing.T) { } pod.Name = podName - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil) assert.Nil(t, err) assert.Len(t, logs, 2) } @@ -236,7 +236,7 @@ func TestGetLogsForContainerInPod_Stackdriver(t *testing.T) { } pod.Name = podName - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil) assert.Nil(t, err) assert.Len(t, logs, 1) } @@ -252,7 +252,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) { IsStackDriverEnabled: true, StackDriverTemplateURI: "https://sd-my-log-server/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", - }, []*core.TaskLog{ + }, nil, []*core.TaskLog{ { Uri: "https://k8s-my-log-server/my-namespace/my-pod/ContainerName/ContainerID", MessageFormat: core.TaskLog_JSON, @@ -275,7 +275,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) { assertTestSucceeded(t, &LogConfig{ IsStackDriverEnabled: true, StackDriverTemplateURI: "https://sd-my-log-server/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", - }, []*core.TaskLog{ + }, nil, []*core.TaskLog{ { Uri: "https://sd-my-log-server/my-namespace/my-pod/ContainerName/ContainerID", MessageFormat: core.TaskLog_JSON, @@ -285,7 +285,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) { }) } -func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*core.TaskLog) { +func assertTestSucceeded(tb testing.TB, config *LogConfig, taskTemplate *core.TaskTemplate, expectedTaskLogs []*core.TaskLog) { logPlugin, err := InitializeLogPlugins(config) assert.NoError(tb, err) @@ -310,7 +310,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c }, } - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil, taskTemplate) assert.Nil(tb, err) assert.Len(tb, logs, len(expectedTaskLogs)) if diff := deep.Equal(logs, expectedTaskLogs); len(diff) > 0 { @@ -337,7 +337,7 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) { Scheme: tasklog.TemplateSchemeTaskExecution, }, }, - }, []*core.TaskLog{ + }, nil, []*core.TaskLog{ { Uri: "https://my-log-server/my-namespace/my-pod/ContainerName/ContainerID", MessageFormat: core.TaskLog_JSON, @@ -350,3 +350,31 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) { }, }) } + +func TestGetLogsForContainerInPod_Flyin(t *testing.T) { + assertTestSucceeded(t, + &LogConfig{ + IsKubernetesEnabled: true, + KubernetesTemplateURI: "https://k8s.com", + IsFlyinEnabled: true, + FlyinTemplateURI: "https://flyin.mydomain.com:{{ .port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", + }, + &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "65535", + }, + }, + []*core.TaskLog{ + { + Uri: "https://k8s.com", + MessageFormat: core.TaskLog_JSON, + Name: "Kubernetes Logs my-Suffix", + }, + { + Uri: "https://flyin.mydomain.com:65535/my-namespace/my-pod/ContainerName/ContainerID", + MessageFormat: core.TaskLog_JSON, + Name: "Flyin Logs my-Suffix", + }, + }) +} diff --git a/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go b/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go index b266a6f5e8..eae0ac98b7 100644 --- a/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go +++ b/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go @@ -153,6 +153,11 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin return pluginsCore.PhaseInfoUndefined, err } + taskTemplate, err := pluginContext.TaskReader().Read(ctx) + if err != nil { + return pluginsCore.PhaseInfoUndefined, err + } + pod := r.(*v1.Pod) transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time @@ -168,7 +173,7 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown { - taskLogs, err := logs.GetLogsForContainerInPod(ctx, logPlugin, taskExecID, pod, 0, logSuffix, extraLogTemplateVarsByScheme) + taskLogs, err := logs.GetLogsForContainerInPod(ctx, logPlugin, taskExecID, pod, 0, logSuffix, extraLogTemplateVarsByScheme, taskTemplate) if err != nil { return pluginsCore.PhaseInfoUndefined, err } From 28df4f248eb861b831d5e2d1abf7a69bb0ccd4a7 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Dec 2023 12:20:56 -0300 Subject: [PATCH 4/8] Add flyin template scheme and unit tests Signed-off-by: Eduardo Apolinario --- .../tasks/pluginmachinery/tasklog/plugin.go | 3 + .../tasks/pluginmachinery/tasklog/template.go | 29 ++++ .../pluginmachinery/tasklog/template_test.go | 132 ++++++++++++++++++ .../tasklog/templatescheme_enumer.go | 11 +- 4 files changed, 170 insertions(+), 5 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go index c43da02e58..da2357a6d9 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go @@ -14,6 +14,7 @@ type TemplateScheme int const ( TemplateSchemePod TemplateScheme = iota TemplateSchemeTaskExecution + TemplateSchemeFlyin ) // TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. @@ -30,6 +31,7 @@ type TemplateVarsByScheme struct { Common TemplateVars Pod TemplateVars TaskExecution TemplateVars + Flyin TemplateVars } // Input contains all available information about task's execution that a log plugin can use to construct task's @@ -48,6 +50,7 @@ type Input struct { PodUID string TaskExecutionID pluginsCore.TaskExecutionID ExtraTemplateVarsByScheme *TemplateVarsByScheme + TaskTemplate *core.TaskTemplate } // Output contains all task logs a plugin generates for a given Input. diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 750b1972df..829b634084 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -35,6 +35,7 @@ type templateRegexes struct { ExecutionProject *regexp.Regexp ExecutionDomain *regexp.Regexp GeneratedName *regexp.Regexp + Port *regexp.Regexp } func initDefaultRegexes() templateRegexes { @@ -60,6 +61,7 @@ func initDefaultRegexes() templateRegexes { MustCreateRegex("executionProject"), MustCreateRegex("executionDomain"), MustCreateRegex("generatedName"), + MustCreateRegex("port"), } } @@ -85,6 +87,18 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { } switch scheme { + case TemplateSchemeFlyin: + // TODO: Confirm that having a default port is okay. + port := input.TaskTemplate.GetConfig()["port"] + if port == "" { + port = "8081" + } + vars = append( + vars, + // Replace the port with the port from the task template. + TemplateVar{defaultRegexes.Port, port}, + ) + fallthrough case TemplateSchemePod: // Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log // stream. Therefore, we must also strip the prefix. @@ -181,7 +195,22 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { templateVars := input.templateVarsForScheme(p.Scheme) taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs)) + + // Grab metadata from task template and check if key "link_type" is set to "vscode". + // If so, add a vscode link to the task logs. + isFlyin := false + if input.TaskTemplate != nil && input.TaskTemplate.GetConfig() != nil { + config := input.TaskTemplate.GetConfig() + if config != nil && config["link_type"] == "vscode" { + isFlyin = true + } + } for _, templateURI := range p.TemplateURIs { + // Skip Flyin logs if plugin is enabled but no metadata is defined in input's task template. + // This is to prevent Flyin logs from being generated for tasks that don't have a Flyin metadata section. + if p.DisplayName == "Flyin Logs" && isFlyin == false { + continue + } taskLogs = append(taskLogs, &core.TaskLog{ Uri: replaceAll(templateURI, templateVars), Name: p.DisplayName + input.LogName, diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index f279707a3b..09637fb811 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -74,6 +74,24 @@ func Test_Input_templateVarsForScheme(t *testing.T) { LogName: "main_logs", TaskExecutionID: dummyTaskExecID(), } + flyinBase := Input{ + HostName: "my-host", + PodName: "my-pod", + PodUID: "my-pod-uid", + Namespace: "my-namespace", + ContainerName: "my-container", + ContainerID: "docker://containerID", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "port": "1234", + }, + }, + } tests := []struct { name string @@ -202,6 +220,50 @@ func Test_Input_templateVarsForScheme(t *testing.T) { {testRegexes.Baz, "baz"}, }, }, + { + "flyin happy path", + TemplateSchemeFlyin, + flyinBase, + nil, + nil, + TemplateVars{ + {defaultRegexes.Port, "1234"}, + }, + nil, + }, + { + "flyin and pod happy path", + TemplateSchemeFlyin, + flyinBase, + nil, + TemplateVars{ + {defaultRegexes.LogName, "main_logs"}, + {defaultRegexes.Port, "1234"}, + {defaultRegexes.PodName, "my-pod"}, + {defaultRegexes.PodUID, "my-pod-uid"}, + {defaultRegexes.Namespace, "my-namespace"}, + {defaultRegexes.ContainerName, "my-container"}, + {defaultRegexes.ContainerID, "containerID"}, + {defaultRegexes.Hostname, "my-host"}, + {defaultRegexes.PodRFC3339StartTime, "1970-01-01T01:02:03+01:00"}, + {defaultRegexes.PodRFC3339FinishTime, "1970-01-01T04:25:45+01:00"}, + {defaultRegexes.PodUnixStartTime, "123"}, + {defaultRegexes.PodUnixFinishTime, "12345"}, + }, + nil, + nil, + }, + { + "pod with port not affected", + TemplateSchemePod, + podBase, + nil, + nil, + nil, + TemplateVars{ + {defaultRegexes.Port, "1234"}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -474,6 +536,76 @@ func TestTemplateLogPlugin(t *testing.T) { }, }, }, + { + "flyin", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + }, + args{ + input: Input{ + PodName: "my-pod-name", + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "1234", + }, + }, + }, + }, + Output{ + TaskLogs: []*core.TaskLog{ + { + Uri: "vscode://flyin:1234/my-pod-name", + MessageFormat: core.TaskLog_JSON, + }, + }, + }, + }, + { + "flyin - default port", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + }, + args{ + input: Input{ + PodName: "my-pod-name", + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + }, + }, + }, + }, + Output{ + TaskLogs: []*core.TaskLog{ + { + Uri: "vscode://flyin:8081/my-pod-name", + MessageFormat: core.TaskLog_JSON, + }, + }, + }, + }, + { + "flyin - no link_type in task template", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + DisplayName: "Flyin Logs", + }, + args{ + input: Input{ + PodName: "my-pod-name", + }, + }, + Output{ + TaskLogs: []*core.TaskLog{}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go index 70f15faf01..c1f4d668c0 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go @@ -7,9 +7,9 @@ import ( "fmt" ) -const _TemplateSchemeName = "PodTaskExecution" +const _TemplateSchemeName = "PodTaskExecutionFlyin" -var _TemplateSchemeIndex = [...]uint8{0, 3, 16} +var _TemplateSchemeIndex = [...]uint8{0, 3, 16, 21} func (i TemplateScheme) String() string { if i < 0 || i >= TemplateScheme(len(_TemplateSchemeIndex)-1) { @@ -18,11 +18,12 @@ func (i TemplateScheme) String() string { return _TemplateSchemeName[_TemplateSchemeIndex[i]:_TemplateSchemeIndex[i+1]] } -var _TemplateSchemeValues = []TemplateScheme{0, 1} +var _TemplateSchemeValues = []TemplateScheme{0, 1, 2} var _TemplateSchemeNameToValueMap = map[string]TemplateScheme{ - _TemplateSchemeName[0:3]: 0, - _TemplateSchemeName[3:16]: 1, + _TemplateSchemeName[0:3]: 0, + _TemplateSchemeName[3:16]: 1, + _TemplateSchemeName[16:21]: 2, } // TemplateSchemeString retrieves an enum value from the enum constants string name. From 00f263b28907a8d07f9d330d53a681fc0f67b722 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Dec 2023 12:39:59 -0300 Subject: [PATCH 5/8] Revert unintended change. Signed-off-by: Eduardo Apolinario --- flytestdlib/storage/cached_rawstore.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index a37a4cdf6b..913a517a0f 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -35,6 +35,9 @@ type cachedRawStore struct { // Head gets metadata about the reference. This should generally be a lightweight operation. func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head") + defer span.End() + key := []byte(reference) if oRaw, err := s.cache.Get(key); err == nil { s.metrics.CacheHit.Inc() From bba2f70435cf73db8aec5e366119cdb703cd40ed Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Dec 2023 12:48:56 -0300 Subject: [PATCH 6/8] Lint Signed-off-by: Eduardo Apolinario --- flyteplugins/go/tasks/pluginmachinery/tasklog/template.go | 2 +- flyteplugins/go/tasks/plugins/array/awsbatch/executor.go | 2 +- flyteplugins/go/tasks/plugins/array/k8s/management.go | 8 ++++---- flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go | 2 +- .../pkg/controller/executors/failure_node_lookup_test.go | 3 ++- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 829b634084..d6b0ec9a7d 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -208,7 +208,7 @@ func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { for _, templateURI := range p.TemplateURIs { // Skip Flyin logs if plugin is enabled but no metadata is defined in input's task template. // This is to prevent Flyin logs from being generated for tasks that don't have a Flyin metadata section. - if p.DisplayName == "Flyin Logs" && isFlyin == false { + if p.DisplayName == "Flyin Logs" && isFlyin { continue } taskLogs = append(taskLogs, &core.TaskLog{ diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go index 6c7a858be8..1e98736129 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go @@ -78,7 +78,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c case arrayCore.PhaseAssembleFinalOutput: pluginState.State, err = array.AssembleFinalOutputs(ctx, e.outputAssembler, tCtx, arrayCore.PhaseSuccess, version+1, pluginState.State) - + case arrayCore.PhaseAbortSubTasks: fallthrough diff --git a/flyteplugins/go/tasks/plugins/array/k8s/management.go b/flyteplugins/go/tasks/plugins/array/k8s/management.go index 510f202e1a..d6abaaf74b 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/management.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/management.go @@ -382,10 +382,10 @@ func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kube messageCollector.Collect(childIdx, err.Error()) } else { externalResources = append(externalResources, &core.ExternalResource{ - ExternalID: stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), - Index: uint32(originalIdx), - RetryAttempt: uint32(retryAttempt), - Phase: core.PhaseAborted, + ExternalID: stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), + Index: uint32(originalIdx), + RetryAttempt: uint32(retryAttempt), + Phase: core.PhaseAborted, }) } } diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go index b1fdd61e79..48bff1e9e9 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go @@ -2,7 +2,6 @@ package agent import ( "context" - flyteidlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "testing" "time" @@ -10,6 +9,7 @@ import ( "google.golang.org/grpc" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + flyteidlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" pluginCoreMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks" webapiPlugin "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/webapi/mocks" diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go index b2dfa32231..e9d6857ec4 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go @@ -4,9 +4,10 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" - "github.com/stretchr/testify/assert" ) type nl struct { From eb5c81d195c7b41e75415f02d6b0d3339ace0748 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Dec 2023 13:03:25 -0300 Subject: [PATCH 7/8] Fix bad refactor due to lint warning Signed-off-by: Eduardo Apolinario --- flyteplugins/go/tasks/pluginmachinery/tasklog/template.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index d6b0ec9a7d..89d9d1ce18 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -208,7 +208,7 @@ func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { for _, templateURI := range p.TemplateURIs { // Skip Flyin logs if plugin is enabled but no metadata is defined in input's task template. // This is to prevent Flyin logs from being generated for tasks that don't have a Flyin metadata section. - if p.DisplayName == "Flyin Logs" && isFlyin { + if p.DisplayName == "Flyin Logs" && !isFlyin { continue } taskLogs = append(taskLogs, &core.TaskLog{ From 40213874a64ad4b7b8172be0090770bb2d36f12a Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Tue, 19 Dec 2023 14:34:59 -0300 Subject: [PATCH 8/8] Remove TODOs Signed-off-by: Eduardo Apolinario --- flyteplugins/go/tasks/logs/logging_utils.go | 1 - flyteplugins/go/tasks/pluginmachinery/tasklog/template.go | 4 +--- .../go/tasks/pluginmachinery/tasklog/template_test.go | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index a82963a652..20f3522e27 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -113,7 +113,6 @@ func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) { if len(cfg.FlyinTemplateURI) > 0 { plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{cfg.FlyinTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - // TODO: Figure out what to use a default here. plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://flyin.%s/logs/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", cfg.GCPProjectName)}, MessageFormat: core.TaskLog_JSON}) } } diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 89d9d1ce18..ea5c5f373c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -88,14 +88,12 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { switch scheme { case TemplateSchemeFlyin: - // TODO: Confirm that having a default port is okay. port := input.TaskTemplate.GetConfig()["port"] if port == "" { - port = "8081" + port = "8080" } vars = append( vars, - // Replace the port with the port from the task template. TemplateVar{defaultRegexes.Port, port}, ) fallthrough diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index 09637fb811..ad6eef25a3 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -583,7 +583,7 @@ func TestTemplateLogPlugin(t *testing.T) { Output{ TaskLogs: []*core.TaskLog{ { - Uri: "vscode://flyin:8081/my-pod-name", + Uri: "vscode://flyin:8080/my-pod-name", MessageFormat: core.TaskLog_JSON, }, },