Skip to content

Commit

Permalink
Fix bug causing GetJobSetEvents to get stuck (#2903)
Browse files Browse the repository at this point in the history
* Add error message of final job run to JobFailedMessage

When we hit the maximum retry limit, the JobFailedMessage just says something along the lines of
"Job has been retried too many times, giving up"

Now we include the final run error in that message - to make it easier to work out the cause of retries

* Fix bug causing GetJobSetEvents to get stuck

GetJobSetEvents only increments its fromId variable on sending new messages

However now all redis events produce api events that will be sent downstream

The issue here is if we get 500 redis events in a row that don't produce api events, then the fromId never gets updated
 - Meaning the watching gets stuck here

To fix this, ReadEvents now returns a lastMessageId. So if there are no messages to process, the fromId should be updated using the lastMessageId

* Formatting
  • Loading branch information
JamesMurkin authored Aug 29, 2023
1 parent 59e8f58 commit 6340bcb
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 23 deletions.
23 changes: 15 additions & 8 deletions internal/armada/repository/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

type EventRepository interface {
CheckStreamExists(queue string, jobSetId string) (bool, error)
ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error)
GetLastMessageId(queue, jobSetId string) (string, error)
}

Expand Down Expand Up @@ -65,10 +65,10 @@ func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId strin
return exists, nil
}

func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error) {
func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error) {
from, err := sequence.Parse(lastId)
if err != nil {
return nil, err
return nil, nil, err
}
seqId := from.PrevRedisId()
cmd, err := repo.db.XRead(&redis.XReadArgs{
Expand All @@ -79,30 +79,37 @@ func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, last

// redis signals empty list by Nil
if err == redis.Nil {
return make([]*api.EventStreamMessage, 0), nil
return make([]*api.EventStreamMessage, 0), nil, nil
} else if err != nil {
return nil, errors.WithStack(fmt.Errorf("%s (fromId: %s, seqId: %s)", err, from, seqId))
return nil, nil, errors.WithStack(fmt.Errorf("%s (fromId: %s, seqId: %s)", err, from, seqId))
}

var lastMessageId *sequence.ExternalSeqNo = nil
messages := make([]*api.EventStreamMessage, 0, len(cmd[0].Messages))
for _, m := range cmd[0].Messages {
// TODO: here we decompress all the events we fetched from the db- it would be much better
// If we could decompress lazily, but the interface confines us somewhat here
apiEvents, err := repo.extractEvents(m, queue, jobSetId)
if err != nil {
return nil, err
return nil, nil, err
}
// Set a default id for the message, if there are apiEvents produced by this message then they'll overwrite this value
lastMessageId, err = sequence.FromRedisId(m.ID, 0, true)
if err != nil {
return nil, nil, err
}
for i, msg := range apiEvents {
msgId, err := sequence.FromRedisId(m.ID, i, i == len(apiEvents)-1)
if err != nil {
return nil, err
return nil, nil, err
}
lastMessageId = msgId
if msgId.IsAfter(from) {
messages = append(messages, &api.EventStreamMessage{Id: msgId.String(), Message: msg})
}
}
}
return messages, nil
return messages, lastMessageId, nil
}

func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error) {
Expand Down
51 changes: 44 additions & 7 deletions internal/armada/repository/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/armada/repository/sequence"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
Expand Down Expand Up @@ -85,6 +86,26 @@ var running = &armadaevents.EventSequence_Event{
},
}

var runSucceeded = &armadaevents.EventSequence_Event{
Created: &baseTime,
Event: &armadaevents.EventSequence_Event_JobRunSucceeded{
JobRunSucceeded: &armadaevents.JobRunSucceeded{
RunId: runIdProto,
JobId: jobIdProto,
ResourceInfos: []*armadaevents.KubernetesResourceInfo{
{
Info: &armadaevents.KubernetesResourceInfo_PodInfo{
PodInfo: &armadaevents.PodInfo{
NodeName: nodeName,
PodNumber: podNumber,
},
},
},
},
},
},
}

var expectedPending = api.EventMessage{
Events: &api.EventMessage_Pending{
Pending: &api.JobPendingEvent{
Expand Down Expand Up @@ -124,20 +145,35 @@ func TestRead(t *testing.T) {
assert.NoError(t, err)

// Fetch from beginning
events, err := r.ReadEvents(testQueue, jobSetName, "", 500, 1*time.Second)
events, lastMessageId, err := r.ReadEvents(testQueue, jobSetName, "", 500, 1*time.Second)
assert.NoError(t, err)
assertExpected(t, events, &expectedPending, &expectedRunning)
assertExpected(t, events, lastMessageId, &expectedPending, &expectedRunning)

// Fetch from offset in the middle
offset := events[0].Id
events, err = r.ReadEvents(testQueue, jobSetName, offset, 500, 1*time.Second)
events, lastMessageId, err = r.ReadEvents(testQueue, jobSetName, offset, 500, 1*time.Second)
assert.NoError(t, err)
assertExpected(t, events, &expectedRunning)
assertExpected(t, events, lastMessageId, &expectedRunning)

// Fetch from offset after
offset = events[0].Id
events, err = r.ReadEvents(testQueue, jobSetName, offset, 500, 1*time.Second)
events, lastMessageId, err = r.ReadEvents(testQueue, jobSetName, offset, 500, 1*time.Second)
assert.NoError(t, err)
assert.Nil(t, lastMessageId)
assert.Equal(t, 0, len(events))

// Fetch for events that won't produce api events
// JobRunSucceeded doesn't result in an api event, so expect:
// - No events
// - Last message Id to be non-nil
err = storeEvents(r, runSucceeded)
assert.NoError(t, err)
offSetId, err := sequence.Parse(offset)
assert.NoError(t, err)
events, lastMessageId, err = r.ReadEvents(testQueue, jobSetName, offSetId.String(), 500, 1*time.Second)
assert.NoError(t, err)
assert.NotNil(t, lastMessageId)
assert.True(t, lastMessageId.IsAfter(offSetId))
assert.Equal(t, 0, len(events))
})
}
Expand All @@ -152,7 +188,7 @@ func TestGetLastId(t *testing.T) {
// Now create the stream and fetch the events to manually determine the last id
err = storeEvents(r, assigned, running)
assert.NoError(t, err)
events, err := r.ReadEvents(testQueue, jobSetName, "", 500, 1*time.Second)
events, _, err := r.ReadEvents(testQueue, jobSetName, "", 500, 1*time.Second)
assert.NoError(t, err)
actualLastId := events[1].Id

Expand Down Expand Up @@ -189,12 +225,13 @@ func withRedisEventRepository(action func(r *RedisEventRepository)) {
action(repo)
}

func assertExpected(t *testing.T, actual []*api.EventStreamMessage, expected ...*api.EventMessage) {
func assertExpected(t *testing.T, actual []*api.EventStreamMessage, lastMessageId *sequence.ExternalSeqNo, expected ...*api.EventMessage) {
assert.Equal(t, len(actual), len(expected))

for i, streamMessage := range expected {
assert.Equal(t, expected[i].Events, streamMessage.Events)
}
assert.Equal(t, actual[len(actual)-1].Id, lastMessageId.String())
}

func storeEvents(r *RedisEventRepository, events ...*armadaevents.EventSequence_Event) error {
Expand Down
22 changes: 14 additions & 8 deletions internal/armada/server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,26 @@ func (s *EventServer) serveEventsFromRepository(request *api.JobSetRequest, even
default:
}

messages, err := eventRepository.ReadEvents(request.Queue, request.Id, fromId, 500, timeout)
messages, lastMessageId, err := eventRepository.ReadEvents(request.Queue, request.Id, fromId, 500, timeout)
if err != nil {
return status.Errorf(codes.Unavailable, "[GetJobSetEvents] error reading events: %s", err)
}

stop := len(messages) == 0
for _, msg := range messages {
fromId = msg.Id
if fromId == stopAfter {
stop = true
if len(messages) == 0 {
if lastMessageId != nil {
fromId = lastMessageId.String()
}
err = stream.Send(msg)
if err != nil {
return status.Errorf(codes.Unavailable, "[GetJobSetEvents] error sending event: %s", err)
} else {
for _, msg := range messages {
fromId = msg.Id
if fromId == stopAfter {
stop = true
}
err = stream.Send(msg)
if err != nil {
return status.Errorf(codes.Unavailable, "[GetJobSetEvents] error sending event: %s", err)
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,12 @@ func (s *Scheduler) generateUpdateMessagesFromJob(job *jobdb.Job, jobRunErrors m
if failFast {
errorMessage = fmt.Sprintf("Job has fail fast flag set - this job will no longer be retried")
}

if runError != nil && runError.GetPodLeaseReturned() != nil {
errorMessage += "\n\n" + "Final run error:"
errorMessage += "\n" + runError.GetPodLeaseReturned().GetMessage()
}

runError = &armadaevents.Error{
Terminal: true,
Reason: &armadaevents.Error_MaxRunsExceeded{
Expand Down

0 comments on commit 6340bcb

Please sign in to comment.