From 6340bcbdf4f91b1fd4c95a7de20329b30511a582 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Tue, 29 Aug 2023 12:37:03 +0100 Subject: [PATCH] Fix bug causing GetJobSetEvents to get stuck (#2903) * Add error message of final job run to JobFailedMessage When we hit the maximum retry limit, the JobFailedMessage just says something along the lines of "Job has been retried too many times, giving up" Now we include the final run error in that message - to make it easier to work out the cause of retries * Fix bug causing GetJobSetEvents to get stuck GetJobSetEvents only increments its fromId variable on sending new messages However now all redis events produce api events that will be sent downstream The issue here is if we get 500 redis events in a row that don't produce api events, then the fromId never gets updated - Meaning the watching gets stuck here To fix this, ReadEvents now returns a lastMessageId. So if there are no messages to process, the fromId should be updated using the lastMessageId * Formatting --- internal/armada/repository/event.go | 23 +++++++---- internal/armada/repository/event_test.go | 51 ++++++++++++++++++++---- internal/armada/server/event.go | 22 ++++++---- internal/scheduler/scheduler.go | 6 +++ 4 files changed, 79 insertions(+), 23 deletions(-) diff --git a/internal/armada/repository/event.go b/internal/armada/repository/event.go index 76538f2fa89..2e05ba377c6 100644 --- a/internal/armada/repository/event.go +++ b/internal/armada/repository/event.go @@ -26,7 +26,7 @@ const ( type EventRepository interface { CheckStreamExists(queue string, jobSetId string) (bool, error) - ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error) + ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error) GetLastMessageId(queue, jobSetId string) (string, error) } @@ -65,10 +65,10 @@ func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId strin return exists, nil } -func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error) { +func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error) { from, err := sequence.Parse(lastId) if err != nil { - return nil, err + return nil, nil, err } seqId := from.PrevRedisId() cmd, err := repo.db.XRead(&redis.XReadArgs{ @@ -79,30 +79,37 @@ func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, last // redis signals empty list by Nil if err == redis.Nil { - return make([]*api.EventStreamMessage, 0), nil + return make([]*api.EventStreamMessage, 0), nil, nil } else if err != nil { - return nil, errors.WithStack(fmt.Errorf("%s (fromId: %s, seqId: %s)", err, from, seqId)) + return nil, nil, errors.WithStack(fmt.Errorf("%s (fromId: %s, seqId: %s)", err, from, seqId)) } + var lastMessageId *sequence.ExternalSeqNo = nil messages := make([]*api.EventStreamMessage, 0, len(cmd[0].Messages)) for _, m := range cmd[0].Messages { // TODO: here we decompress all the events we fetched from the db- it would be much better // If we could decompress lazily, but the interface confines us somewhat here apiEvents, err := repo.extractEvents(m, queue, jobSetId) if err != nil { - return nil, err + return nil, nil, err + } + // Set a default id for the message, if there are apiEvents produced by this message then they'll overwrite this value + lastMessageId, err = sequence.FromRedisId(m.ID, 0, true) + if err != nil { + return nil, nil, err } for i, msg := range apiEvents { msgId, err := sequence.FromRedisId(m.ID, i, i == len(apiEvents)-1) if err != nil { - return nil, err + return nil, nil, err } + lastMessageId = msgId if msgId.IsAfter(from) { messages = append(messages, &api.EventStreamMessage{Id: msgId.String(), Message: msg}) } } } - return messages, nil + return messages, lastMessageId, nil } func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error) { diff --git a/internal/armada/repository/event_test.go b/internal/armada/repository/event_test.go index 5a8e0a4d80b..fdbc641404f 100644 --- a/internal/armada/repository/event_test.go +++ b/internal/armada/repository/event_test.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/armadaproject/armada/internal/armada/repository/sequence" "github.com/armadaproject/armada/internal/common/compress" "github.com/armadaproject/armada/pkg/api" "github.com/armadaproject/armada/pkg/armadaevents" @@ -85,6 +86,26 @@ var running = &armadaevents.EventSequence_Event{ }, } +var runSucceeded = &armadaevents.EventSequence_Event{ + Created: &baseTime, + Event: &armadaevents.EventSequence_Event_JobRunSucceeded{ + JobRunSucceeded: &armadaevents.JobRunSucceeded{ + RunId: runIdProto, + JobId: jobIdProto, + ResourceInfos: []*armadaevents.KubernetesResourceInfo{ + { + Info: &armadaevents.KubernetesResourceInfo_PodInfo{ + PodInfo: &armadaevents.PodInfo{ + NodeName: nodeName, + PodNumber: podNumber, + }, + }, + }, + }, + }, + }, +} + var expectedPending = api.EventMessage{ Events: &api.EventMessage_Pending{ Pending: &api.JobPendingEvent{ @@ -124,20 +145,35 @@ func TestRead(t *testing.T) { assert.NoError(t, err) // Fetch from beginning - events, err := r.ReadEvents(testQueue, jobSetName, "", 500, 1*time.Second) + events, lastMessageId, err := r.ReadEvents(testQueue, jobSetName, "", 500, 1*time.Second) assert.NoError(t, err) - assertExpected(t, events, &expectedPending, &expectedRunning) + assertExpected(t, events, lastMessageId, &expectedPending, &expectedRunning) // Fetch from offset in the middle offset := events[0].Id - events, err = r.ReadEvents(testQueue, jobSetName, offset, 500, 1*time.Second) + events, lastMessageId, err = r.ReadEvents(testQueue, jobSetName, offset, 500, 1*time.Second) assert.NoError(t, err) - assertExpected(t, events, &expectedRunning) + assertExpected(t, events, lastMessageId, &expectedRunning) // Fetch from offset after offset = events[0].Id - events, err = r.ReadEvents(testQueue, jobSetName, offset, 500, 1*time.Second) + events, lastMessageId, err = r.ReadEvents(testQueue, jobSetName, offset, 500, 1*time.Second) + assert.NoError(t, err) + assert.Nil(t, lastMessageId) + assert.Equal(t, 0, len(events)) + + // Fetch for events that won't produce api events + // JobRunSucceeded doesn't result in an api event, so expect: + // - No events + // - Last message Id to be non-nil + err = storeEvents(r, runSucceeded) + assert.NoError(t, err) + offSetId, err := sequence.Parse(offset) + assert.NoError(t, err) + events, lastMessageId, err = r.ReadEvents(testQueue, jobSetName, offSetId.String(), 500, 1*time.Second) assert.NoError(t, err) + assert.NotNil(t, lastMessageId) + assert.True(t, lastMessageId.IsAfter(offSetId)) assert.Equal(t, 0, len(events)) }) } @@ -152,7 +188,7 @@ func TestGetLastId(t *testing.T) { // Now create the stream and fetch the events to manually determine the last id err = storeEvents(r, assigned, running) assert.NoError(t, err) - events, err := r.ReadEvents(testQueue, jobSetName, "", 500, 1*time.Second) + events, _, err := r.ReadEvents(testQueue, jobSetName, "", 500, 1*time.Second) assert.NoError(t, err) actualLastId := events[1].Id @@ -189,12 +225,13 @@ func withRedisEventRepository(action func(r *RedisEventRepository)) { action(repo) } -func assertExpected(t *testing.T, actual []*api.EventStreamMessage, expected ...*api.EventMessage) { +func assertExpected(t *testing.T, actual []*api.EventStreamMessage, lastMessageId *sequence.ExternalSeqNo, expected ...*api.EventMessage) { assert.Equal(t, len(actual), len(expected)) for i, streamMessage := range expected { assert.Equal(t, expected[i].Events, streamMessage.Events) } + assert.Equal(t, actual[len(actual)-1].Id, lastMessageId.String()) } func storeEvents(r *RedisEventRepository, events ...*armadaevents.EventSequence_Event) error { diff --git a/internal/armada/server/event.go b/internal/armada/server/event.go index 20235365994..484f1a3a9f9 100644 --- a/internal/armada/server/event.go +++ b/internal/armada/server/event.go @@ -193,20 +193,26 @@ func (s *EventServer) serveEventsFromRepository(request *api.JobSetRequest, even default: } - messages, err := eventRepository.ReadEvents(request.Queue, request.Id, fromId, 500, timeout) + messages, lastMessageId, err := eventRepository.ReadEvents(request.Queue, request.Id, fromId, 500, timeout) if err != nil { return status.Errorf(codes.Unavailable, "[GetJobSetEvents] error reading events: %s", err) } stop := len(messages) == 0 - for _, msg := range messages { - fromId = msg.Id - if fromId == stopAfter { - stop = true + if len(messages) == 0 { + if lastMessageId != nil { + fromId = lastMessageId.String() } - err = stream.Send(msg) - if err != nil { - return status.Errorf(codes.Unavailable, "[GetJobSetEvents] error sending event: %s", err) + } else { + for _, msg := range messages { + fromId = msg.Id + if fromId == stopAfter { + stop = true + } + err = stream.Send(msg) + if err != nil { + return status.Errorf(codes.Unavailable, "[GetJobSetEvents] error sending event: %s", err) + } } } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index fc1c629dc6f..34c74e353c1 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -638,6 +638,12 @@ func (s *Scheduler) generateUpdateMessagesFromJob(job *jobdb.Job, jobRunErrors m if failFast { errorMessage = fmt.Sprintf("Job has fail fast flag set - this job will no longer be retried") } + + if runError != nil && runError.GetPodLeaseReturned() != nil { + errorMessage += "\n\n" + "Final run error:" + errorMessage += "\n" + runError.GetPodLeaseReturned().GetMessage() + } + runError = &armadaevents.Error{ Terminal: true, Reason: &armadaevents.Error_MaxRunsExceeded{