diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index bf926319c..169ddde85 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -1442,7 +1442,12 @@ func (m *ExecutionManager) ListExecutions( logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err) return nil, err } - executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions) + + listExecutionTransformer := &transformers.ExecutionTransformerOptions{ + TrimErrorMessage: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages, + MaxErrorMessageLength: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength, + } + executionList, err := transformers.FromExecutionModels(output.Executions, listExecutionTransformer) if err != nil { logger.Errorf(ctx, "Failed to transform execution models [%+v] with err: %v", output.Executions, err) diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index ae5a7bb40..8f4f908b0 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -333,6 +333,11 @@ func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context, func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Context, nodeExecutionModels []models.NodeExecution) ([]*admin.NodeExecution, error) { nodeExecutions := make([]*admin.NodeExecution, len(nodeExecutionModels)) + listExecutionTransformer := &transformers.ExecutionTransformerOptions{ + TrimErrorMessage: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages, + MaxErrorMessageLength: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength, + } + for idx, nodeExecutionModel := range nodeExecutionModels { nodeExecution, err := m.transformNodeExecutionModel(ctx, nodeExecutionModel, &core.NodeExecutionIdentifier{ ExecutionId: &core.WorkflowExecutionIdentifier{ @@ -341,7 +346,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Conte Name: nodeExecutionModel.Name, }, NodeId: nodeExecutionModel.NodeID, - }, transformers.ListExecutionTransformerOptions) + }, listExecutionTransformer) if err != nil { return nil, err } diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index a1c43c36b..0263d6c15 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -537,7 +537,8 @@ func TestTransformNodeExecutionModelList(t *testing.T) { }) manager := NodeExecutionManager{ - db: repository, + db: repository, + config: getMockExecutionsConfigProvider(), } nodeExecutions, err := manager.transformNodeExecutionModelList(ctx, []models.NodeExecution{ { diff --git a/pkg/manager/impl/testutils/config.go b/pkg/manager/impl/testutils/config.go index c2fe20139..b35ebadee 100644 --- a/pkg/manager/impl/testutils/config.go +++ b/pkg/manager/impl/testutils/config.go @@ -30,5 +30,8 @@ func GetApplicationConfigWithDefaultDomains() runtimeInterfaces.ApplicationConfi Scheme: common.Local, SignedURL: runtimeInterfaces.SignedURL{ Enabled: true, }}) + + config.GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages = true + config.GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength = 10240 return &config } diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index e92fbc589..71467fc11 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -22,8 +22,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" ) -const trimmedErrMessageLen = 100 - var clusterReassignablePhases = sets.NewString(core.WorkflowExecution_UNDEFINED.String(), core.WorkflowExecution_QUEUED.String()) // CreateExecutionModelInput encapsulates request parameters for calls to CreateExecutionModel. @@ -47,13 +45,11 @@ type CreateExecutionModelInput struct { } type ExecutionTransformerOptions struct { - TrimErrorMessage bool + TrimErrorMessage bool + MaxErrorMessageLength int } var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{} -var ListExecutionTransformerOptions = &ExecutionTransformerOptions{ - TrimErrorMessage: true, -} // CreateExecutionModel transforms a ExecutionCreateRequest to a Execution model func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, error) { @@ -328,8 +324,8 @@ func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransfor } if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { trimmedErrOutputResult := closure.GetError() - if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen { - trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] + if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength { + trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength] } closure.OutputResult = &admin.ExecutionClosure_Error{ Error: trimmedErrOutputResult, diff --git a/pkg/repositories/transformers/execution_test.go b/pkg/repositories/transformers/execution_test.go index fc42a82cd..4402628dd 100644 --- a/pkg/repositories/transformers/execution_test.go +++ b/pkg/repositories/transformers/execution_test.go @@ -570,6 +570,7 @@ func TestFromExecutionModel_Aborted(t *testing.T) { } func TestFromExecutionModel_Error(t *testing.T) { + trimmedErrMessageLen := 10240 extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen)) execErr := &core.ExecutionError{ Code: "CODE", @@ -590,7 +591,8 @@ func TestFromExecutionModel_Error(t *testing.T) { Closure: executionClosureBytes, } execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{ - TrimErrorMessage: true, + TrimErrorMessage: true, + MaxErrorMessageLength: trimmedErrMessageLen, }) expectedExecErr := execErr expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen)) diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index f1d90361f..5ebaf3974 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -323,8 +323,8 @@ func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution, opts *Execu } if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { trimmedErrOutputResult := closure.GetError() - if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen { - trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] + if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength { + trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength] } closure.OutputResult = &admin.NodeExecutionClosure_Error{ Error: trimmedErrOutputResult, diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index 88ef8fc26..dc4bbb3de 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -528,6 +528,7 @@ func TestFromNodeExecutionModel(t *testing.T) { } func TestFromNodeExecutionModel_Error(t *testing.T) { + trimmedErrMessageLen := 10240 extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen)) execErr := &core.ExecutionError{ Code: "CODE", @@ -551,7 +552,10 @@ func TestFromNodeExecutionModel_Error(t *testing.T) { NodeExecutionMetadata: nodeExecutionMetadataBytes, InputURI: "input uri", Duration: duration, - }, &ExecutionTransformerOptions{TrimErrorMessage: true}) + }, &ExecutionTransformerOptions{ + TrimErrorMessage: true, + MaxErrorMessageLength: trimmedErrMessageLen, + }) assert.Nil(t, err) expectedExecErr := execErr diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index e3eca3884..17e9c40bd 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -441,8 +441,8 @@ func FromTaskExecutionModel(taskExecutionModel models.TaskExecution, opts *Execu } if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { trimmedErrOutputResult := closure.GetError() - if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen { - trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] + if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength { + trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength] } closure.OutputResult = &admin.TaskExecutionClosure_Error{ Error: trimmedErrOutputResult, diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index da7af92a2..d24840950 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -600,6 +600,7 @@ func TestFromTaskExecutionModel(t *testing.T) { } func TestFromTaskExecutionModel_Error(t *testing.T) { + trimmedErrMessageLen := 10240 extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen)) execErr := &core.ExecutionError{ Code: "CODE", @@ -633,7 +634,8 @@ func TestFromTaskExecutionModel_Error(t *testing.T) { Closure: closureBytes, } taskExecution, err := FromTaskExecutionModel(taskExecutionModel, &ExecutionTransformerOptions{ - TrimErrorMessage: true, + TrimErrorMessage: true, + MaxErrorMessageLength: trimmedErrMessageLen, }) expectedExecErr := execErr @@ -653,7 +655,8 @@ func TestFromTaskExecutionModel_Error(t *testing.T) { }) taskExecutionModel.Closure = closureBytes taskExecution, err = FromTaskExecutionModel(taskExecutionModel, &ExecutionTransformerOptions{ - TrimErrorMessage: true, + TrimErrorMessage: true, + MaxErrorMessageLength: trimmedErrMessageLen, }) expectedExecErr = execErr expectedExecErr.Message = string(make([]byte, 10)) diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go index 3b8b0a270..01849d7b6 100644 --- a/pkg/runtime/application_config_provider.go +++ b/pkg/runtime/application_config_provider.go @@ -33,6 +33,11 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic MaxParallelism: 25, K8SServiceAccount: "", UseOffloadedWorkflowClosure: false, + + ListExecutionTransformersConfig: interfaces.ExecutionTransformersConfig{ + TrimErrorMessages: true, + MaxErrorMessageLength: 10240, + }, }) var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{ diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 16b1f921d..7968f6ad2 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -48,6 +48,13 @@ type PostgresConfig struct { Debug bool `json:"debug" pflag:" Whether or not to start the database connection with debug mode enabled."` } +type ExecutionTransformersConfig struct { + // TrimErrorMessages indicates whether error messages returned by the list workflow execution API should be trimmed. + TrimErrorMessages bool `json:"trimErrorMessages"` + // The maximum length of an error message returned by the list workflow execution API. + MaxErrorMessageLength int `json:"maxErrorMessageLength"` +} + // ApplicationConfig is the base configuration to start admin type ApplicationConfig struct { // The RoleName key inserted as an annotation (https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) @@ -94,6 +101,9 @@ type ApplicationConfig struct { // Enabling will use Storage (s3/gcs/etc) to offload static parts of CRDs. UseOffloadedWorkflowClosure bool `json:"useOffloadedWorkflowClosure"` + + // Configures the execution transformers + ListExecutionTransformersConfig ExecutionTransformersConfig `json:"listExecutionTransformersConfig"` } func (a *ApplicationConfig) GetRoleNameKey() string {