Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set Meta of Restarted Job as Unused #687

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/api/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.Execut
r runner.Runner, wsURL *url.URL, cleanup func(),
) {
s.T().Helper()
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("SetRunnerMetaUsed", mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil)
Expand Down
90 changes: 7 additions & 83 deletions internal/nomad/executor_api_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 21 additions & 8 deletions internal/nomad/nomad.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@
ExecuteCommand(ctx context.Context, jobID string, command string, tty bool, privilegedExecution bool,
stdin io.Reader, stdout, stderr io.Writer) (int, error)

// MarkRunnerAsUsed marks the runner with the given ID as used. It also stores the timeout duration in the metadata.
MarkRunnerAsUsed(runnerID string, duration int) error
// SetRunnerMetaUsed marks the runner with the given ID as used or unused.
// If used, also the timeout duration is stored in the metadata.
SetRunnerMetaUsed(runnerID string, used bool, duration int) error
}

// APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real
Expand Down Expand Up @@ -174,18 +175,30 @@
return jobs, occurredError
}

func (a *APIClient) MarkRunnerAsUsed(runnerID string, duration int) error {
func (a *APIClient) SetRunnerMetaUsed(runnerID string, used bool, duration int) error {
newMetaUsedValue := ConfigMetaUsedValue
if !used {
newMetaUsedValue = ConfigMetaUnusedValue

Check warning on line 181 in internal/nomad/nomad.go

View check run for this annotation

Codecov / codecov/patch

internal/nomad/nomad.go#L181

Added line #L181 was not covered by tests
}
newMetaTimeoutValue := strconv.Itoa(duration)

job, err := a.job(runnerID)
if err != nil {
return fmt.Errorf("couldn't retrieve job info: %w", err)
}
configTaskGroup := FindAndValidateConfigTaskGroup(job)
configTaskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUsedValue
configTaskGroup.Meta[ConfigMetaTimeoutKey] = strconv.Itoa(duration)
metaUsedDiffers := configTaskGroup.Meta[ConfigMetaUsedKey] != newMetaUsedValue
metaTimeoutDiffers := configTaskGroup.Meta[ConfigMetaTimeoutKey] != newMetaTimeoutValue

_, err = a.RegisterNomadJob(job)
if err != nil {
return fmt.Errorf("couldn't update runner config: %w", err)
if metaUsedDiffers || (used && metaTimeoutDiffers) {
configTaskGroup.Meta[ConfigMetaUsedKey] = newMetaUsedValue
if used {
configTaskGroup.Meta[ConfigMetaTimeoutKey] = newMetaTimeoutValue
}
_, err = a.RegisterNomadJob(job)
if err != nil {
return fmt.Errorf("couldn't update runner config: %w", err)

Check warning on line 200 in internal/nomad/nomad.go

View check run for this annotation

Codecov / codecov/patch

internal/nomad/nomad.go#L200

Added line #L200 was not covered by tests
}
}
return nil
}
Expand Down
19 changes: 12 additions & 7 deletions internal/runner/nomad_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,24 @@
}

m.usedRunners.Add(runner.ID(), runner)
go m.markRunnerAsUsed(runner, duration)
go m.setRunnerMetaUsed(runner, true, duration)

runner.SetupTimeout(time.Duration(duration) * time.Second)
return runner, nil
}

func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) {
func (m *NomadRunnerManager) setRunnerMetaUsed(runner Runner, used bool, timeoutDuration int) {
err := util.RetryExponential(func() (err error) {
if err = m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration); err != nil {
if err = m.apiClient.SetRunnerMetaUsed(runner.ID(), used, timeoutDuration); err != nil {
err = fmt.Errorf("cannot mark runner as used: %w", err)
}
return
})
if err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).Error("cannot mark runner as used")
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).WithField("used", used).Error("cannot mark runner")
err := m.Return(runner)
if err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).Error("can't mark runner as used and can't return runner")
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).Error("can't mark runner and can't return runner")

Check warning on line 75 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L75

Added line #L75 was not covered by tests
}
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@
if err != nil {
log.WithField(dto.KeyRunnerID, newJob.ID()).WithError(err).Warn("failed loading timeout from meta values")
timeout = int(nomad.RunnerTimeoutFallback.Seconds())
go m.markRunnerAsUsed(newJob, timeout)
go m.setRunnerMetaUsed(newJob, true, timeout)
}
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
} else {
Expand Down Expand Up @@ -311,7 +311,12 @@
if alloc.AllocatedResources != nil {
mappedPorts = alloc.AllocatedResources.Shared.Ports
}
environment.AddRunner(NewNomadJob(ctx, alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed))

r := NewNomadJob(ctx, alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed)
if alloc.PreviousAllocation != "" {
go m.setRunnerMetaUsed(r, false, 0)
}
environment.AddRunner(r)
go m.checkPrewarmingPoolAlert(ctx, environment, true)
monitorAllocationStartupDuration(startup, alloc.JobID, environmentID)
}
Expand Down
29 changes: 24 additions & 5 deletions internal/runner/nomad_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func mockRunnerQueries(ctx context.Context, apiMock *nomad.ExecutorAPIMock, retu
})
apiMock.On("LoadEnvironmentJobs").Return([]*nomadApi.Job{}, nil)
apiMock.On("LoadRunnerJobs", mock.AnythingOfType("dto.EnvironmentID")).Return([]*nomadApi.Job{}, nil)
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("SetRunnerMetaUsed", mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("LoadRunnerIDs", tests.DefaultRunnerID).Return(returnedRunnerIDs, nil)
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
apiMock.On("RegisterRunnerJob", mock.Anything).Return(nil)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *ManagerTestSuite) TestClaimRemovesRunnerWhenMarkAsUsedFails() {
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil, false)
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
util.MaxConnectionRetriesExponential = 1
modifyMockedCall(s.apiMock, "MarkRunnerAsUsed", func(call *mock.Call) {
modifyMockedCall(s.apiMock, "SetRunnerMetaUsed", func(call *mock.Call) {
call.Run(func(_ mock.Arguments) {
call.ReturnArguments = mock.Arguments{tests.ErrDefault}
})
Expand Down Expand Up @@ -441,6 +441,25 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() {
s.Require().NoError(err)
})
})
s.nomadRunnerManager.usedRunners.Purge()
s.Run("resets meta used when added allocation has a previous allocation", func() {
environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsString)
s.True(ok)
environmentMock, ok := environment.(*ExecutionEnvironmentMock)
s.Require().True(ok)
mockIdleRunners(environmentMock)

alloc := &nomadApi.Allocation{JobID: tests.DefaultRunnerID, PreviousAllocation: tests.DefaultUUID}
s.nomadRunnerManager.onAllocationAdded(s.TestCtx, alloc, 0)

<-time.After(tests.ShortTimeout)
s.apiMock.AssertCalled(s.T(), "SetRunnerMetaUsed", tests.DefaultRunnerID, false, 0)

runner, ok := environment.Sample()
s.True(ok)
err := runner.Destroy(nil)
s.Require().NoError(err)
})
}

func (s *ManagerTestSuite) TestOnAllocationStopped() {
Expand Down Expand Up @@ -571,7 +590,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
})

s.Run("Stores used runner", func() {
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("SetRunnerMetaUsed", mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("int")).Return(nil)
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
Expand Down Expand Up @@ -613,7 +632,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
})

s.Run("Don't stop running executions", func() {
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil).Once()
apiMock.On("SetRunnerMetaUsed", mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("int")).Return(nil).Once()
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
Expand Down Expand Up @@ -667,7 +686,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
apiMock.On("LoadRunnerPortMappings", mock.Anything).
Return([]nomadApi.PortMapping{updatedPortMapping}, nil).Once()

apiMock.On("MarkRunnerAsUsed", mock.Anything, mock.Anything).Return(nil).Once()
apiMock.On("SetRunnerMetaUsed", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
Expand Down
Loading