From 739260a622aa0d82f3e8b89f679f87b61b29eaf8 Mon Sep 17 00:00:00 2001 From: Adrien CABARBAYE Date: Mon, 17 Jun 2024 16:54:50 +0100 Subject: [PATCH] :bug: [`job`] Wait for job to start before progressing with messages (#74) ### Description - the job manager was not handling the fact a job could be queued for some time and so, no messages would be accessible. ### Test Coverage - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update). --- changes/20240617160218.bugfix | 1 + utils/go.mod | 4 +- utils/go.sum | 8 ++-- utils/job/interfaces.go | 5 ++- utils/job/{ => jobtest}/testing.go | 18 ++++++--- utils/job/manager.go | 60 ++++++++++++++++++++++++++++++ utils/job/manager_test.go | 31 +++++++++++---- utils/mocks/mock_job.go | 29 +++++++++++++++ 8 files changed, 137 insertions(+), 19 deletions(-) create mode 100644 changes/20240617160218.bugfix rename utils/job/{ => jobtest}/testing.go (72%) diff --git a/changes/20240617160218.bugfix b/changes/20240617160218.bugfix new file mode 100644 index 0000000..e5f5535 --- /dev/null +++ b/changes/20240617160218.bugfix @@ -0,0 +1 @@ +:bug: [`job`] Wait for job to start before progressing with messages diff --git a/utils/go.mod b/utils/go.mod index e1afc1b..e7d50ef 100644 --- a/utils/go.mod +++ b/utils/go.mod @@ -3,8 +3,8 @@ module github.com/ARM-software/embedded-development-services-client-utils/utils go 1.22 require ( - github.com/ARM-software/embedded-development-services-client/client v1.31.3 - github.com/ARM-software/golang-utils/utils v1.66.1 + github.com/ARM-software/embedded-development-services-client/client v1.31.4 + github.com/ARM-software/golang-utils/utils v1.68.0 github.com/go-faker/faker/v4 v4.4.2 github.com/go-logr/logr v1.4.2 github.com/golang/mock v1.6.0 diff --git a/utils/go.sum b/utils/go.sum index 86a99ee..157ca7a 100644 --- a/utils/go.sum +++ b/utils/go.sum @@ -1,8 +1,8 @@ bitbucket.org/creachadair/stringset v0.0.9/go.mod h1:t+4WcQ4+PXTa8aQdNKe40ZP6iwesoMFWAxPGd3UGjyY= -github.com/ARM-software/embedded-development-services-client/client v1.31.3 h1:OJTaRUnmfdVBZWaY6ALg3FQ0SRlbASM4bHmL97yxvsU= -github.com/ARM-software/embedded-development-services-client/client v1.31.3/go.mod h1:DfoiGBGolbmt6mud6JG0fCEd2UpMyFaeYsB9d7cVSNo= -github.com/ARM-software/golang-utils/utils v1.66.1 h1:fuaJ6uB5hbKXHVeaitZK5KeDCIryN0WAgkwnbbxUX70= -github.com/ARM-software/golang-utils/utils v1.66.1/go.mod h1:PFxtFy3iRiGvmH6X9EUflQet+VcuUS3WZ0qD/rtTu2A= +github.com/ARM-software/embedded-development-services-client/client v1.31.4 h1:sUtf53bEPaME7GBqPAFCBoVRDdAn6brZAuVnlYfsKns= +github.com/ARM-software/embedded-development-services-client/client v1.31.4/go.mod h1:ZNNMwjBLtXKIUHed1p3EYy71CzFWeUv8C/HLgqlNvY0= +github.com/ARM-software/golang-utils/utils v1.68.0 h1:Sp92FvRhGhzfnvIKHDdLb8slrhcidBNFzB9GkpcsN3E= +github.com/ARM-software/golang-utils/utils v1.68.0/go.mod h1:pjIYW6jPUXH7/kxkZEPyzRwiux+NUG/BFJFn8zaJ/0A= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= diff --git a/utils/job/interfaces.go b/utils/job/interfaces.go index 925660b..3f13ff1 100644 --- a/utils/job/interfaces.go +++ b/utils/job/interfaces.go @@ -29,13 +29,16 @@ type IAsynchronousJob interface { GetSuccess() bool // GetStatus returns the state the job is in. This is for information only and should not be relied upon as likely to change. Use flags for implementing a state machine. GetStatus() string + // GetQueued returns whether the job is being queued and has not started just yet + GetQueued() bool } // IJobManager defines a manager of asynchronous jobs type IJobManager interface { // HasJobCompleted calls the services to determine whether the job has completed. HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error) - + // HasJobStarted calls the services to determine whether the job has started. + HasJobStarted(ctx context.Context, job IAsynchronousJob) (completed bool, err error) // WaitForJobCompletion waits for a job to complete. WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error) } diff --git a/utils/job/testing.go b/utils/job/jobtest/testing.go similarity index 72% rename from utils/job/testing.go rename to utils/job/jobtest/testing.go index b378cc7..275f4f6 100644 --- a/utils/job/testing.go +++ b/utils/job/jobtest/testing.go @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package job +package jobtest import ( "github.com/go-faker/faker/v4" @@ -18,6 +18,10 @@ type MockAsynchronousJob struct { failure bool } +func (m *MockAsynchronousJob) GetQueued() bool { + return !m.done +} + func (m *MockAsynchronousJob) FetchType() string { return "Mock Asynchronous Job" } @@ -42,7 +46,7 @@ func (m *MockAsynchronousJob) GetStatus() string { return faker.Word() } -func newMockAsynchronousJob(done bool, failure bool) (IAsynchronousJob, error) { +func newMockAsynchronousJob(done bool, failure bool) (*MockAsynchronousJob, error) { r, err := resourcetests.NewMockResource() if err != nil { return nil, err @@ -54,14 +58,18 @@ func newMockAsynchronousJob(done bool, failure bool) (IAsynchronousJob, error) { }, nil } -func NewMockUndoneAsynchronousJob() (IAsynchronousJob, error) { +func NewMockUndoneAsynchronousJob() (*MockAsynchronousJob, error) { + return newMockAsynchronousJob(false, false) +} + +func NewMockQueuedAsynchronousJob() (*MockAsynchronousJob, error) { return newMockAsynchronousJob(false, false) } -func NewMockSuccessfulAsynchronousJob() (IAsynchronousJob, error) { +func NewMockSuccessfulAsynchronousJob() (*MockAsynchronousJob, error) { return newMockAsynchronousJob(true, false) } -func NewMockFailedAsynchronousJob() (IAsynchronousJob, error) { +func NewMockFailedAsynchronousJob() (*MockAsynchronousJob, error) { return newMockAsynchronousJob(true, true) } diff --git a/utils/job/manager.go b/utils/job/manager.go index c66d68b..5bd5f6a 100644 --- a/utils/job/manager.go +++ b/utils/job/manager.go @@ -17,7 +17,9 @@ import ( "github.com/ARM-software/embedded-development-services-client-utils/utils/messages" "github.com/ARM-software/golang-utils/utils/collection/pagination" "github.com/ARM-software/golang-utils/utils/commonerrors" + "github.com/ARM-software/golang-utils/utils/logs" "github.com/ARM-software/golang-utils/utils/parallelisation" + "github.com/ARM-software/golang-utils/utils/retry" ) type Manager struct { @@ -53,6 +55,30 @@ func (m *Manager) FetchJobMessagesFirstPage(ctx context.Context, job IAsynchrono return } +func (m *Manager) waitForJobToStart(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob) (err error) { + err = parallelisation.DetermineContextError(ctx) + if err != nil { + return + } + + jobName, err := job.FetchName() + if err != nil { + return + } + notStartedError := fmt.Errorf("%w: job [%v] has not started", commonerrors.ErrCondition, jobName) + err = retry.RetryOnError(ctx, logs.NewPlainLogrLoggerFromLoggers(logger), retry.DefaultExponentialBackoffRetryPolicyConfiguration(), func() error { + started, subErr := m.HasJobStarted(ctx, job) + if subErr != nil { + return subErr + } + if started { + return nil + } + return notStartedError + }, fmt.Sprintf("Waiting for job [%v] to start...", jobName), notStartedError) + return +} + func (m *Manager) createMessagePaginator(ctx context.Context, job IAsynchronousJob) (paginator pagination.IStreamPaginatorAndPageFetcher, err error) { paginator, err = m.messagesPaginatorFactory.Create(ctx, func(subCtx context.Context) (pagination.IStaticPageStream, error) { return m.FetchJobMessagesFirstPage(subCtx, job) @@ -74,6 +100,10 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob _ = messageLogger.Close() } }() + err = m.waitForJobToStart(ctx, messageLogger, job) + if err != nil { + return + } messagePaginator, err := m.createMessagePaginator(ctx, job) if err != nil { return @@ -119,6 +149,36 @@ func (m *Manager) checkForMessageStreamExhaustion(ctx context.Context, paginator } } +func (m *Manager) HasJobStarted(ctx context.Context, job IAsynchronousJob) (started bool, err error) { + err = parallelisation.DetermineContextError(ctx) + if err != nil { + return + } + if job == nil { + err = fmt.Errorf("%w: missing job", commonerrors.ErrUndefined) + return + } + jobName, err := job.FetchName() + if err != nil { + return + } + jobType := job.FetchType() + jobStatus, resp, apierr := m.fetchJobStatusFunc(ctx, jobName) + if resp != nil { + _ = resp.Body.Close() + } + err = api.CheckAPICallSuccess(ctx, fmt.Sprintf("could not fetch %v [%v]'s status", jobType, jobName), resp, apierr) + if err != nil { + return + } + if jobStatus.GetDone() { + started = true + } else { + started = !jobStatus.GetQueued() + } + return +} + func (m *Manager) HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error) { err = parallelisation.DetermineContextError(ctx) if err != nil { diff --git a/utils/job/manager_test.go b/utils/job/manager_test.go index 086338a..55a584d 100644 --- a/utils/job/manager_test.go +++ b/utils/job/manager_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/ARM-software/embedded-development-services-client-utils/utils/job/jobtest" "github.com/ARM-software/embedded-development-services-client-utils/utils/logging" "github.com/ARM-software/embedded-development-services-client-utils/utils/messages" pagination2 "github.com/ARM-software/embedded-development-services-client-utils/utils/pagination" @@ -41,17 +42,22 @@ func TestManager_HasJobCompleted(t *testing.T) { expectedError: commonerrors.ErrUndefined, }, { - jobFunc: NewMockFailedAsynchronousJob, + jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob), expectCompleted: true, expectedError: commonerrors.ErrInvalid, }, { - jobFunc: NewMockUndoneAsynchronousJob, + jobFunc: mapFunc(jobtest.NewMockUndoneAsynchronousJob), expectCompleted: false, expectedError: nil, }, { - jobFunc: NewMockSuccessfulAsynchronousJob, + jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob), + expectCompleted: false, + expectedError: nil, + }, + { + jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob), expectCompleted: true, expectedError: nil, }, @@ -92,11 +98,11 @@ func TestManager_checkForMessageStreamExhaustion(t *testing.T) { expectedError: commonerrors.ErrUndefined, }, { - jobFunc: NewMockFailedAsynchronousJob, + jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob), expectedError: nil, }, { - jobFunc: NewMockSuccessfulAsynchronousJob, + jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob), expectedError: nil, }, } @@ -132,6 +138,13 @@ func TestManager_checkForMessageStreamExhaustion(t *testing.T) { } } +func mapFunc(f func() (*jobtest.MockAsynchronousJob, error)) func() (IAsynchronousJob, error) { + return func() (IAsynchronousJob, error) { + job, err := f() + return job, err + } +} + func TestManager_WaitForJobCompletion(t *testing.T) { defer goleak.VerifyNone(t) tests := []struct { @@ -139,11 +152,15 @@ func TestManager_WaitForJobCompletion(t *testing.T) { expectedError error }{ { - jobFunc: NewMockFailedAsynchronousJob, + jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob), expectedError: commonerrors.ErrInvalid, }, { - jobFunc: NewMockSuccessfulAsynchronousJob, + jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob), + expectedError: commonerrors.ErrCondition, + }, + { + jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob), expectedError: nil, }, } diff --git a/utils/mocks/mock_job.go b/utils/mocks/mock_job.go index 3dc3df3..808b291 100644 --- a/utils/mocks/mock_job.go +++ b/utils/mocks/mock_job.go @@ -142,6 +142,20 @@ func (mr *MockIAsynchronousJobMockRecorder) GetFailure() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFailure", reflect.TypeOf((*MockIAsynchronousJob)(nil).GetFailure)) } +// GetQueued mocks base method. +func (m *MockIAsynchronousJob) GetQueued() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetQueued") + ret0, _ := ret[0].(bool) + return ret0 +} + +// GetQueued indicates an expected call of GetQueued. +func (mr *MockIAsynchronousJobMockRecorder) GetQueued() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueued", reflect.TypeOf((*MockIAsynchronousJob)(nil).GetQueued)) +} + // GetStatus mocks base method. func (m *MockIAsynchronousJob) GetStatus() string { m.ctrl.T.Helper() @@ -208,6 +222,21 @@ func (mr *MockIJobManagerMockRecorder) HasJobCompleted(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasJobCompleted", reflect.TypeOf((*MockIJobManager)(nil).HasJobCompleted), arg0, arg1) } +// HasJobStarted mocks base method. +func (m *MockIJobManager) HasJobStarted(arg0 context.Context, arg1 job.IAsynchronousJob) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasJobStarted", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HasJobStarted indicates an expected call of HasJobStarted. +func (mr *MockIJobManagerMockRecorder) HasJobStarted(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasJobStarted", reflect.TypeOf((*MockIJobManager)(nil).HasJobStarted), arg0, arg1) +} + // WaitForJobCompletion mocks base method. func (m *MockIJobManager) WaitForJobCompletion(arg0 context.Context, arg1 job.IAsynchronousJob) error { m.ctrl.T.Helper()