From 6252699cb5e74ef0bedcfcc2a8e505490f53850d Mon Sep 17 00:00:00 2001 From: Maximilian Pass <22845248+mpass99@users.noreply.github.com> Date: Sun, 8 Sep 2024 14:14:34 +0200 Subject: [PATCH] Add Nomad Job Event Handling for more reliant event handling. In recent cases, we have observed Nomad not sending Allocation stopping events but just JobDeregistered events. We add a handling of those to capture also these cases. --- internal/nomad/event_stream_handling.go | 69 +++++++++++++++----- internal/nomad/event_stream_handling_test.go | 62 ++++++++++++++---- internal/nomad/nomad.go | 7 ++ pkg/monitoring/influxdb2_middleware.go | 2 + 4 files changed, 112 insertions(+), 28 deletions(-) diff --git a/internal/nomad/event_stream_handling.go b/internal/nomad/event_stream_handling.go index 902a07af..201da55d 100644 --- a/internal/nomad/event_stream_handling.go +++ b/internal/nomad/event_stream_handling.go @@ -65,6 +65,7 @@ func (a *APIClient) initializeAllocations(environmentID dto.EnvironmentID) { continue case stub.ClientStatus == structs.AllocClientStatusPending || stub.ClientStatus == structs.AllocClientStatusRunning: log.WithField("jobID", stub.JobID).WithField("status", stub.ClientStatus).Debug("Recovered Allocation") + a.jobAllocationMapping.Add(stub.JobID, stub.ID) a.allocations.Add(stub.ID, &allocationData{ allocClientStatus: stub.ClientStatus, allocDesiredStatus: stub.DesiredStatus, @@ -90,7 +91,9 @@ func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationP case nomadApi.TopicEvaluation: return false, handleEvaluationEvent(a.evaluations, event) case nomadApi.TopicAllocation: - return false, handleAllocationEvent(ctx, startTime, a.allocations, event, callbacks) + return false, handleAllocationEvent(ctx, startTime, a.jobAllocationMapping, a.allocations, event, callbacks) + case nomadApi.TopicJob: + return false, handleJobEvent(ctx, a.jobAllocationMapping, a.allocations, event, callbacks) default: return false, nil } @@ -211,8 +214,8 @@ func checkEvaluation(eval *nomadApi.Evaluation) (err error) { // If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation // is called. The allocations storage is used to track pending and running allocations. Using the // storage the state is persisted between multiple calls of this function. -func handleAllocationEvent(ctx context.Context, startTime int64, allocations storage.Storage[*allocationData], - event *nomadApi.Event, callbacks *AllocationProcessing, +func handleAllocationEvent(ctx context.Context, startTime int64, jobMapping storage.Storage[string], + allocations storage.Storage[*allocationData], event *nomadApi.Event, callbacks *AllocationProcessing, ) error { alloc, err := event.Allocation() if err != nil { @@ -243,21 +246,40 @@ func handleAllocationEvent(ctx context.Context, startTime int64, allocations sto switch alloc.ClientStatus { case structs.AllocClientStatusPending: - handlePendingAllocationEvent(ctx, alloc, allocData, allocations, callbacks) + handlePendingAllocationEvent(ctx, alloc, allocData, jobMapping, allocations, callbacks) case structs.AllocClientStatusRunning: - handleRunningAllocationEvent(ctx, alloc, allocData, allocations, callbacks) + handleRunningAllocationEvent(ctx, alloc, allocData, jobMapping, allocations, callbacks) case structs.AllocClientStatusComplete: handleCompleteAllocationEvent(ctx, alloc, allocData, allocations, callbacks) case structs.AllocClientStatusFailed: - handleFailedAllocationEvent(ctx, alloc, allocData, allocations, callbacks) + handleFailedAllocationEvent(ctx, alloc, allocData, jobMapping, allocations, callbacks) case structs.AllocClientStatusLost: - handleLostAllocationEvent(ctx, alloc, allocData, allocations, callbacks) + handleLostAllocationEvent(ctx, alloc, allocData, jobMapping, allocations, callbacks) default: log.WithField("alloc", alloc).Warn("Other Client Status") } return nil } +// handleJobEvent is an event handler that processes job events. +// On a JobDeregistered event, onDeletedAllocation is called. +func handleJobEvent(ctx context.Context, jobMapping storage.Storage[string], + allocations storage.Storage[*allocationData], event *nomadApi.Event, callbacks *AllocationProcessing, +) error { + // At this point, we do not filter out events that happened before the subscription to the event stream + // (and are left in Nomad's buffer) because the job events do only have index information instead of time information. + // As we currently only handle `JobDeregistered` events for tracked allocations, the handling of old events does not + // change the behavior. + + switch event.Type { + case structs.TypeJobDeregistered: + return handleDeregisteredJobEvent(ctx, event.Key, jobMapping, allocations, callbacks) + default: + log.WithField("event", event).Trace("Ignored Job Event") + } + return nil +} + // filterDuplicateEvents identifies duplicate events or events of unknown allocations. func filterDuplicateEvents(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData]) (valid bool) { newAllocationExpected := alloc.ClientStatus == structs.AllocClientStatusPending && @@ -299,7 +321,7 @@ func updateAllocationData( // handlePendingAllocationEvent manages allocation that are currently pending. // This allows the handling of startups and re-placements of allocations. func handlePendingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocData *allocationData, - allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, + jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, ) { var stopExpected bool switch alloc.DesiredStatus { @@ -315,12 +337,14 @@ func handlePendingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocatio // Handle Runner (/Container) re-allocations. if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok { stopExpected = callbacks.OnDeleted(ctx, prevData.jobID, ErrAllocationRescheduled) + jobMapping.Delete(alloc.JobID) allocations.Delete(alloc.PreviousAllocation) } else { log.WithField("alloc", alloc).Warn("Previous Allocation not found") } } + jobMapping.Add(alloc.JobID, alloc.ID) // Store Pending Allocation - Allocation gets started, wait until it runs. allocations.Add(alloc.ID, &allocationData{ allocClientStatus: alloc.ClientStatus, @@ -332,6 +356,7 @@ func handlePendingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocatio }) case structs.AllocDesiredStatusStop: // As this allocation was still pending, we don't have to propagate its deletion. + jobMapping.Delete(alloc.JobID) allocations.Delete(alloc.ID) // Anyway, we want to monitor the occurrences. if !allocData.stopExpected { @@ -347,7 +372,7 @@ func handlePendingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocatio // handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events. func handleRunningAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocData *allocationData, - allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, + jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, ) { switch alloc.DesiredStatus { case structs.AllocDesiredStatusRun: @@ -355,12 +380,25 @@ func handleRunningAllocationEvent(ctx context.Context, alloc *nomadApi.Allocatio callbacks.OnNew(ctx, alloc, startupDuration) case structs.AllocDesiredStatusStop: callbacks.OnDeleted(ctx, alloc.JobID, ErrAllocationCompleted) + jobMapping.Delete(alloc.JobID) allocations.Delete(alloc.ID) default: log.WithField("alloc", alloc).Warn("Other Desired Status") } } +func handleDeregisteredJobEvent(ctx context.Context, jobID string, + jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, +) error { + if allocID, ok := jobMapping.Pop(jobID); ok { + // If the allocData is already removed, another event has already handled the Deletion (or the allocation was not tracked by Poseidon). + if _, ok = allocations.Pop(allocID); ok { + callbacks.OnDeleted(ctx, jobID, ErrJobDeregistered) + } + } + return nil +} + // handleCompleteAllocationEvent handles allocations that stopped. func handleCompleteAllocationEvent(_ context.Context, alloc *nomadApi.Allocation, _ *allocationData, allocations storage.Storage[*allocationData], _ *AllocationProcessing, @@ -381,24 +419,24 @@ func handleCompleteAllocationEvent(_ context.Context, alloc *nomadApi.Allocation // handleFailedAllocationEvent logs only the last of the multiple failure events. func handleFailedAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocData *allocationData, - allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, + jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, ) { // The stop is expected when the allocation desired to stop even before it failed. reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop - handleStoppingAllocationEvent(ctx, alloc, allocations, callbacks, reschedulingExpected) + handleStoppingAllocationEvent(ctx, alloc, jobMapping, allocations, callbacks, reschedulingExpected) } // handleLostAllocationEvent logs only the last of the multiple lost events. func handleLostAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocData *allocationData, - allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, + jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, ) { // The stop is expected when the allocation desired to stop even before it got lost. reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop - handleStoppingAllocationEvent(ctx, alloc, allocations, callbacks, reschedulingExpected) + handleStoppingAllocationEvent(ctx, alloc, jobMapping, allocations, callbacks, reschedulingExpected) } -func handleStoppingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData], - callbacks *AllocationProcessing, reschedulingExpected bool, +func handleStoppingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, jobMapping storage.Storage[string], + allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, reschedulingExpected bool, ) { replacementAllocationScheduled := alloc.NextAllocation != "" correctRescheduling := reschedulingExpected == replacementAllocationScheduled @@ -412,6 +450,7 @@ func handleStoppingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocati reason = ErrAllocationRescheduledUnexpectedly } removedByPoseidon = callbacks.OnDeleted(ctx, alloc.JobID, reason) + jobMapping.Delete(alloc.JobID) allocations.Delete(alloc.ID) } diff --git a/internal/nomad/event_stream_handling_test.go b/internal/nomad/event_stream_handling_test.go index 08f4a3d0..b74a2f3a 100644 --- a/internal/nomad/event_stream_handling_test.go +++ b/internal/nomad/event_stream_handling_test.go @@ -47,7 +47,10 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationReturnsErrorWhenStreamRet apiMock := &apiQuerierMock{} apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")). Return(nil, tests.ErrDefault) - apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false} + apiClient := &APIClient{ + apiMock, storage.NewLocalStorage[chan error](), + storage.NewLocalStorage[string](), storage.NewLocalStorage[*allocationData](), false, + } err := apiClient.MonitorEvaluation(context.Background(), "id") s.ErrorIs(err, tests.ErrDefault) } @@ -316,6 +319,29 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { []*nomadApi.Allocation{startedAllocation, startedAllocation}, []string{startedAllocation.JobID}) }) + deregisteredEvent := nomadApi.Event{Topic: nomadApi.TopicJob, Type: structs.TypeJobDeregistered, Key: tests.DefaultRunnerID} + deregisteredEvents := nomadApi.Events{Events: []nomadApi.Event{deregisteredEvent}} + pendingStartDeregisteredEvents := nomadApi.Events{Events: []nomadApi.Event{ + eventForAllocation(s.T(), pendingAllocation), + eventForAllocation(s.T(), startedAllocation), + deregisteredEvent, + }} + + s.Run("JobDeregistered behaves like Allocation stopping", func() { + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartDeregisteredEvents}, + []*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID}) + }) + + s.Run("onDelete Handler is not called twice on duplicate JobDeregistered", func() { + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartDeregisteredEvents, &deregisteredEvents}, + []*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID}) + }) + + s.Run("onDelete Handler is not called twice on JobDeregistered and Allocation stopping", func() { + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartDeregisteredEvents, &stoppingEvents}, + []*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID}) + }) + rescheduleAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) rescheduleAllocation.ID = tests.AnotherUUID rescheduleAllocation.PreviousAllocation = pendingAllocation.ID @@ -374,7 +400,7 @@ func (s *MainTestSuite) TestHandleAllocationEventBuffersPendingAllocation() { allocations := storage.NewLocalStorage[*allocationData]() err := handleAllocationEvent(s.TestCtx, - time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessing) + time.Now().UnixNano(), storage.NewLocalStorage[string](), allocations, &newPendingEvent, noopAllocationProcessing) s.Require().NoError(err) _, ok := allocations.Get(newPendingAllocation.ID) @@ -387,7 +413,7 @@ func (s *MainTestSuite) TestHandleAllocationEventBuffersPendingAllocation() { allocations := storage.NewLocalStorage[*allocationData]() err := handleAllocationEvent(s.TestCtx, - time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessing) + time.Now().UnixNano(), storage.NewLocalStorage[string](), allocations, &newPendingEvent, noopAllocationProcessing) s.Require().NoError(err) _, ok := allocations.Get(newPendingAllocation.ID) @@ -451,13 +477,14 @@ func (s *MainTestSuite) TestHandleAllocationEvent_ReportsOOMKilledStatus() { allocations.Add(restartedAllocation.ID, &allocationData{jobID: restartedAllocation.JobID}) var reason error - err := handleAllocationEvent(s.TestCtx, time.Now().UnixNano(), allocations, &restartedEvent, &AllocationProcessing{ - OnNew: func(_ context.Context, _ *nomadApi.Allocation, _ time.Duration) {}, - OnDeleted: func(_ context.Context, _ string, r error) bool { - reason = r - return true - }, - }) + err := handleAllocationEvent(s.TestCtx, time.Now().UnixNano(), storage.NewLocalStorage[string](), + allocations, &restartedEvent, &AllocationProcessing{ + OnNew: func(_ context.Context, _ *nomadApi.Allocation, _ time.Duration) {}, + OnDeleted: func(_ context.Context, _ string, r error) bool { + reason = r + return true + }, + }) s.Require().NoError(err) s.ErrorIs(reason, ErrOOMKilled) } @@ -465,7 +492,10 @@ func (s *MainTestSuite) TestHandleAllocationEvent_ReportsOOMKilledStatus() { func (s *MainTestSuite) TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved() { apiMock := &apiQuerierMock{} apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault) - apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false} + apiClient := &APIClient{ + apiMock, storage.NewLocalStorage[chan error](), + storage.NewLocalStorage[string](), storage.NewLocalStorage[*allocationData](), false, + } err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessing) s.ErrorIs(err, tests.ErrDefault) @@ -618,7 +648,10 @@ func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *All apiMock := &apiQuerierMock{} apiMock.On("EventStream", ctx).Return(readOnlyStream, nil) - apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false} + apiClient := &APIClient{ + apiMock, storage.NewLocalStorage[chan error](), + storage.NewLocalStorage[string](), storage.NewLocalStorage[*allocationData](), false, + } errChan := make(chan error) go func() { @@ -733,7 +766,10 @@ func asynchronouslyMonitorEvaluation(stream <-chan *nomadApi.Events) chan error apiMock := &apiQuerierMock{} apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")). Return(readOnlyStream, nil) - apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false} + apiClient := &APIClient{ + apiMock, storage.NewLocalStorage[chan error](), + storage.NewLocalStorage[string](), storage.NewLocalStorage[*allocationData](), false, + } errChan := make(chan error) go func() { diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index affe3088..34a97a48 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -35,6 +35,7 @@ var ( // We do not consider it as an error but add it anyway for a complete reporting. // It is a ErrLocalDestruction because another allocation might be replacing the allocation in the same job. ErrAllocationCompleted RunnerDeletedReason = fmt.Errorf("the allocation completed: %w", ErrLocalDestruction) + ErrJobDeregistered RunnerDeletedReason = fmt.Errorf("the job got deregistered: %w", ErrLocalDestruction) ) type RunnerDeletedReason error @@ -88,6 +89,8 @@ type ExecutorAPI interface { type APIClient struct { apiQuerier evaluations storage.Storage[chan error] + // jobAllocationMapping maps a Job ID to the most recent Allocation ID. + jobAllocationMapping storage.Storage[string] // allocations contain management data for all pending and running allocations. allocations storage.Storage[*allocationData] isListening bool @@ -99,6 +102,10 @@ func NewExecutorAPI(ctx context.Context, nomadConfig *config.Nomad) (ExecutorAPI client := &APIClient{ apiQuerier: &nomadAPIClient{}, evaluations: storage.NewLocalStorage[chan error](), + jobAllocationMapping: storage.NewMonitoredLocalStorage[string](ctx, monitoring.MeasurementNomadJobs, + func(p *write.Point, allocationID string, _ storage.EventType) { + p.AddTag(monitoring.InfluxKeyAllocationID, allocationID) + }, 0), allocations: storage.NewMonitoredLocalStorage[*allocationData](ctx, monitoring.MeasurementNomadAllocations, func(p *write.Point, object *allocationData, _ storage.EventType) { p.AddTag(monitoring.InfluxKeyJobID, object.jobID) diff --git a/pkg/monitoring/influxdb2_middleware.go b/pkg/monitoring/influxdb2_middleware.go index 334374ae..1f7b143c 100644 --- a/pkg/monitoring/influxdb2_middleware.go +++ b/pkg/monitoring/influxdb2_middleware.go @@ -25,6 +25,7 @@ const ( measurementPrefix = "poseidon_" measurementPoolSize = measurementPrefix + "poolsize" MeasurementNomadEvents = measurementPrefix + "nomad_events" + MeasurementNomadJobs = measurementPrefix + "nomad_jobs" MeasurementNomadAllocations = measurementPrefix + "nomad_allocations" MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners" MeasurementExecutionsAWS = measurementPrefix + "aws_executions" @@ -38,6 +39,7 @@ const ( InfluxKeyRunnerID = dto.KeyRunnerID InfluxKeyEnvironmentID = dto.KeyEnvironmentID InfluxKeyJobID = "job_id" + InfluxKeyAllocationID = "allocation_id" InfluxKeyClientStatus = "client_status" InfluxKeyNomadNode = "nomad_agent" InfluxKeyActualContentLength = "actual_length"