From b39b631f42a39e12ea2f0f9999c4d9bdc3e83719 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 20 Sep 2023 23:10:17 -0700 Subject: [PATCH 1/4] Handle batched TaskExecutionEvent reasons Signed-off-by: Andrew Dye --- go.mod | 3 +++ go.sum | 4 ++-- pkg/manager/impl/task_execution_manager.go | 11 +++++++---- pkg/repositories/transformers/task_execution.go | 13 ++++++++++++- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index b30095519..36928dd4d 100644 --- a/go.mod +++ b/go.mod @@ -210,6 +210,9 @@ require ( replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a +// TODO: update version references once dependent PR is merged +replace github.com/flyteorg/flyteidl => github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 + // Retracted versions // This was published in error when attempting to create 1.5.1 Flyte release. retract v1.1.94 diff --git a/go.sum b/go.sum index ed4b6ac85..499817ea3 100644 --- a/go.sum +++ b/go.sum @@ -131,6 +131,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 h1:mkSxCviDJa5cg8obcdJJ0RhB5TE7v/y2pBhinafwvQc= +github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= @@ -293,8 +295,6 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI= -github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index 46967f264..49f05a9be 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -9,16 +9,20 @@ import ( "github.com/flyteorg/flytestdlib/promutils/labeled" - notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces" "github.com/golang/protobuf/proto" + notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces" + "github.com/flyteorg/flytestdlib/storage" "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils" "github.com/prometheus/client_golang/prometheus" + "github.com/flyteorg/flytestdlib/promutils" + + "google.golang.org/grpc/codes" + "github.com/flyteorg/flyteadmin/pkg/common" dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces" "github.com/flyteorg/flyteadmin/pkg/errors" @@ -32,7 +36,6 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/logger" - "google.golang.org/grpc/codes" ) type taskExecutionMetrics struct { @@ -194,7 +197,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req return nil, err } - if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 { + if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 { // TODO: need to be careful about missing inc/decs m.metrics.ActiveTaskExecutions.Inc() } else if common.IsTaskExecutionTerminal(request.Event.Phase) && request.Event.PhaseVersion == 0 { m.metrics.ActiveTaskExecutions.Dec() diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index edfc32b19..228c84eef 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -386,7 +386,18 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE } taskExecutionClosure.UpdatedAt = reportedAt taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) - if len(request.Event.Reason) > 0 { + if len(request.Event.Reasons) > 0 { + for _, reason := range request.Event.Reasons { + taskExecutionClosure.Reasons = append( // TODO: this is where to unpack batch + taskExecutionClosure.Reasons, + &admin.Reason{ + OccurredAt: reason.OccurredAt, + Message: reason.Reason, + }) + } + // TODO: avoid dupes? + // taskExecutionClosure.Reason = request.Event.Reason + } else if len(request.Event.Reason) > 0 { if taskExecutionClosure.Reason != request.Event.Reason { // by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where // a task reports a large number of unique reasons. if this size increase becomes problematic we this logic From dfa2db621c358941bc965b6019f12ebb369edec4 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Mon, 25 Sep 2023 17:16:20 -0700 Subject: [PATCH 2/4] Add tests Signed-off-by: Andrew Dye --- .../transformers/task_execution.go | 19 +- .../transformers/task_execution_test.go | 295 +++++++++++++++++- 2 files changed, 304 insertions(+), 10 deletions(-) diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index 228c84eef..b12a86694 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -153,19 +153,27 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode CreatedAt: input.Request.Event.OccurredAt, Logs: input.Request.Event.Logs, CustomInfo: input.Request.Event.CustomInfo, - Reason: input.Request.Event.Reason, TaskType: input.Request.Event.TaskType, Metadata: metadata, EventVersion: input.Request.Event.EventVersion, } - if len(input.Request.Event.Reason) > 0 { + if len(input.Request.Event.Reasons) > 0 { + for _, reason := range input.Request.Event.Reasons { + closure.Reasons = append(closure.Reasons, &admin.Reason{ + OccurredAt: reason.OccurredAt, + Message: reason.Reason, + }) + } + closure.Reason = input.Request.Event.Reasons[len(input.Request.Event.Reasons)-1].Reason + } else if len(input.Request.Event.Reason) > 0 { closure.Reasons = []*admin.Reason{ - &admin.Reason{ + { OccurredAt: input.Request.Event.OccurredAt, Message: input.Request.Event.Reason, }, } + closure.Reason = input.Request.Event.Reason } eventPhase := input.Request.Event.Phase @@ -388,15 +396,14 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) if len(request.Event.Reasons) > 0 { for _, reason := range request.Event.Reasons { - taskExecutionClosure.Reasons = append( // TODO: this is where to unpack batch + taskExecutionClosure.Reasons = append( taskExecutionClosure.Reasons, &admin.Reason{ OccurredAt: reason.OccurredAt, Message: reason.Reason, }) } - // TODO: avoid dupes? - // taskExecutionClosure.Reason = request.Event.Reason + taskExecutionClosure.Reason = request.Event.Reasons[len(request.Event.Reasons)-1].Reason } else if len(request.Event.Reason) > 0 { if taskExecutionClosure.Reason != request.Event.Reason { // by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index e3155b12f..235fbfcd9 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -285,7 +285,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { UpdatedAt: taskEventOccurredAtProto, Reason: "Task was scheduled", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, @@ -406,6 +406,93 @@ func TestCreateTaskExecutionModelRunning(t *testing.T) { }, taskExecutionModel) } +func TestCreateTaskExecutionModelSingleEvents(t *testing.T) { + taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{ + Request: &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + PhaseVersion: uint32(2), + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", + }, + }, + }) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task event 1", + Reasons: []*admin.Reason{ + {OccurredAt: taskEventOccurredAtProto, Message: "Task event 1"}, + }, + } + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, taskExecutionModel.Closure) +} + +func TestCreateTaskExecutionModelBatchedEvents(t *testing.T) { + secondTaskEventOccurredAt := taskEventOccurredAt.Add(time.Second) + secondTaskEventOccurredAtProto, _ := ptypes.TimestampProto(secondTaskEventOccurredAt) + taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{ + Request: &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + PhaseVersion: uint32(2), + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", // Here for backwards compatibility + Reasons: []*event.BatchedReason{ + { + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", + }, + { + OccurredAt: secondTaskEventOccurredAtProto, + Reason: "Task event 2", + }, + }, + }, + }, + }) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task event 2", + Reasons: []*admin.Reason{ + {OccurredAt: taskEventOccurredAtProto, Message: "Task event 1"}, + {OccurredAt: secondTaskEventOccurredAtProto, Message: "Task event 2"}, + }, + } + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, taskExecutionModel.Closure) +} + func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { existingClosure := &admin.TaskExecutionClosure{ Phase: core.TaskExecution_RUNNING, @@ -425,7 +512,7 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { }), Reason: "Task was scheduled", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, @@ -526,11 +613,11 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { }), Reason: "task failed", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, - &admin.Reason{ + { OccurredAt: occuredAtProto, Message: "task failed", }, @@ -569,6 +656,206 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { } +func TestUpdateTaskExecutionModelSingleEvents(t *testing.T) { + existingClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task was scheduled", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, + } + + closureBytes, err := proto.Marshal(existingClosure) + assert.Nil(t, err) + + existingTaskExecution := models.TaskExecution{ + TaskExecutionKey: models.TaskExecutionKey{ + TaskKey: models.TaskKey{ + Project: sampleTaskID.Project, + Domain: sampleTaskID.Domain, + Name: sampleTaskID.Name, + Version: sampleTaskID.Version, + }, + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: sampleNodeExecID.NodeId, + ExecutionKey: models.ExecutionKey{ + Project: sampleNodeExecID.ExecutionId.Project, + Domain: sampleNodeExecID.ExecutionId.Domain, + Name: sampleNodeExecID.ExecutionId.Name, + }, + }, + RetryAttempt: &retryAttemptValue, + }, + Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING", + InputURI: "input uri", + Closure: closureBytes, + StartedAt: &taskEventOccurredAt, + TaskExecutionCreatedAt: &taskEventOccurredAt, + TaskExecutionUpdatedAt: &taskEventOccurredAt, + } + + occuredAt := taskEventOccurredAt.Add(time.Minute) + occuredAtProto, err := ptypes.TimestampProto(occuredAt) + assert.Nil(t, err) + + taskEventRequest := &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: occuredAtProto, + Reason: "update 1", + }, + } + + err = UpdateTaskExecutionModel(context.TODO(), taskEventRequest, &existingTaskExecution, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + UpdatedAt: occuredAtProto, + CreatedAt: taskEventOccurredAtProto, + Reason: "update 1", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + { + OccurredAt: occuredAtProto, + Message: "update 1", + }, + }, + } + + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, existingTaskExecution.Closure) +} + +func TestUpdateTaskExecutionModelBatchedEvents(t *testing.T) { + existingClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task was scheduled", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, + } + + closureBytes, err := proto.Marshal(existingClosure) + assert.Nil(t, err) + + existingTaskExecution := models.TaskExecution{ + TaskExecutionKey: models.TaskExecutionKey{ + TaskKey: models.TaskKey{ + Project: sampleTaskID.Project, + Domain: sampleTaskID.Domain, + Name: sampleTaskID.Name, + Version: sampleTaskID.Version, + }, + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: sampleNodeExecID.NodeId, + ExecutionKey: models.ExecutionKey{ + Project: sampleNodeExecID.ExecutionId.Project, + Domain: sampleNodeExecID.ExecutionId.Domain, + Name: sampleNodeExecID.ExecutionId.Name, + }, + }, + RetryAttempt: &retryAttemptValue, + }, + Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING", + InputURI: "input uri", + Closure: closureBytes, + StartedAt: &taskEventOccurredAt, + TaskExecutionCreatedAt: &taskEventOccurredAt, + TaskExecutionUpdatedAt: &taskEventOccurredAt, + } + + occuredAt := taskEventOccurredAt.Add(time.Minute) + occuredAtProto, err := ptypes.TimestampProto(occuredAt) + assert.Nil(t, err) + secondOccuredAt := taskEventOccurredAt.Add(time.Minute * 2) + secondOccuredAtProto, err := ptypes.TimestampProto(secondOccuredAt) + + taskEventRequest := &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: occuredAtProto, + Reason: "update 1", // Here for backwards compatibility + Reasons: []*event.BatchedReason{ + &event.BatchedReason{ + OccurredAt: occuredAtProto, + Reason: "update 1", + }, + &event.BatchedReason{ + OccurredAt: secondOccuredAtProto, + Reason: "update 2", + }, + }, + }, + } + + err = UpdateTaskExecutionModel(context.TODO(), taskEventRequest, &existingTaskExecution, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + UpdatedAt: occuredAtProto, + CreatedAt: taskEventOccurredAtProto, + Reason: "update 2", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + { + OccurredAt: occuredAtProto, + Message: "update 1", + }, + { + OccurredAt: secondOccuredAtProto, + Message: "update 2", + }, + }, + } + + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, existingTaskExecution.Closure) +} + func TestFromTaskExecutionModel(t *testing.T) { taskClosure := &admin.TaskExecutionClosure{ Phase: core.TaskExecution_RUNNING, From fd50f54c9a28fd2a076acc694e948d0544589697 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 27 Sep 2023 15:08:45 -0700 Subject: [PATCH 3/4] Update flyteidl version Signed-off-by: Andrew Dye --- go.mod | 5 +---- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 36928dd4d..328657fe0 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.5.14 + github.com/flyteorg/flyteidl v1.5.21 github.com/flyteorg/flyteplugins v1.0.67 github.com/flyteorg/flytepropeller v1.1.98 github.com/flyteorg/flytestdlib v1.0.22 @@ -210,9 +210,6 @@ require ( replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a -// TODO: update version references once dependent PR is merged -replace github.com/flyteorg/flyteidl => github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 - // Retracted versions // This was published in error when attempting to create 1.5.1 Flyte release. retract v1.1.94 diff --git a/go.sum b/go.sum index 499817ea3..25b14ed83 100644 --- a/go.sum +++ b/go.sum @@ -131,8 +131,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 h1:mkSxCviDJa5cg8obcdJJ0RhB5TE7v/y2pBhinafwvQc= -github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= @@ -295,6 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/flyteorg/flyteidl v1.5.21 h1:zP1byUlNFqstTe7Io1DiiNgNf+mZAVmGZM04oIUA5kU= +github.com/flyteorg/flyteidl v1.5.21/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= From c049ced44b1126f5320b465859fbd53a50af0f9d Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 27 Sep 2023 15:19:34 -0700 Subject: [PATCH 4/4] Update to EventReason Signed-off-by: Andrew Dye --- pkg/repositories/transformers/task_execution_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index 235fbfcd9..b196cf01a 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -462,7 +462,7 @@ func TestCreateTaskExecutionModelBatchedEvents(t *testing.T) { }, OccurredAt: taskEventOccurredAtProto, Reason: "Task event 1", // Here for backwards compatibility - Reasons: []*event.BatchedReason{ + Reasons: []*event.EventReason{ { OccurredAt: taskEventOccurredAtProto, Reason: "Task event 1", @@ -797,6 +797,7 @@ func TestUpdateTaskExecutionModelBatchedEvents(t *testing.T) { assert.Nil(t, err) secondOccuredAt := taskEventOccurredAt.Add(time.Minute * 2) secondOccuredAtProto, err := ptypes.TimestampProto(secondOccuredAt) + assert.Nil(t, err) taskEventRequest := &admin.TaskExecutionEventRequest{ Event: &event.TaskExecutionEvent{ @@ -812,12 +813,12 @@ func TestUpdateTaskExecutionModelBatchedEvents(t *testing.T) { }, OccurredAt: occuredAtProto, Reason: "update 1", // Here for backwards compatibility - Reasons: []*event.BatchedReason{ - &event.BatchedReason{ + Reasons: []*event.EventReason{ + { OccurredAt: occuredAtProto, Reason: "update 1", }, - &event.BatchedReason{ + { OccurredAt: secondOccuredAtProto, Reason: "update 2", },