Skip to content

Commit

Permalink
🐛 [job] Wait for job to start before progressing with messages (#74)
Browse files Browse the repository at this point in the history
<!--
Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors.
All rights reserved.
SPDX-License-Identifier: Proprietary
-->
### Description

- the job manager was not handling the fact a job could be queued for
some time and so, no messages would be accessible.

### Test Coverage

<!--
Please put an `x` in the correct box e.g. `[x]` to indicate the testing
coverage of this change.
-->

- [x]  This change is covered by existing or additional automated tests.
- [ ] Manual testing has been performed (and evidence provided) as
automated testing was not feasible.
- [ ] Additional tests are not required for this change (e.g.
documentation update).
  • Loading branch information
acabarbaye authored Jun 17, 2024
1 parent 1105dc9 commit 739260a
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 19 deletions.
1 change: 1 addition & 0 deletions changes/20240617160218.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:bug: [`job`] Wait for job to start before progressing with messages
4 changes: 2 additions & 2 deletions utils/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/ARM-software/embedded-development-services-client-utils/utils
go 1.22

require (
github.com/ARM-software/embedded-development-services-client/client v1.31.3
github.com/ARM-software/golang-utils/utils v1.66.1
github.com/ARM-software/embedded-development-services-client/client v1.31.4
github.com/ARM-software/golang-utils/utils v1.68.0
github.com/go-faker/faker/v4 v4.4.2
github.com/go-logr/logr v1.4.2
github.com/golang/mock v1.6.0
Expand Down
8 changes: 4 additions & 4 deletions utils/go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
bitbucket.org/creachadair/stringset v0.0.9/go.mod h1:t+4WcQ4+PXTa8aQdNKe40ZP6iwesoMFWAxPGd3UGjyY=
github.com/ARM-software/embedded-development-services-client/client v1.31.3 h1:OJTaRUnmfdVBZWaY6ALg3FQ0SRlbASM4bHmL97yxvsU=
github.com/ARM-software/embedded-development-services-client/client v1.31.3/go.mod h1:DfoiGBGolbmt6mud6JG0fCEd2UpMyFaeYsB9d7cVSNo=
github.com/ARM-software/golang-utils/utils v1.66.1 h1:fuaJ6uB5hbKXHVeaitZK5KeDCIryN0WAgkwnbbxUX70=
github.com/ARM-software/golang-utils/utils v1.66.1/go.mod h1:PFxtFy3iRiGvmH6X9EUflQet+VcuUS3WZ0qD/rtTu2A=
github.com/ARM-software/embedded-development-services-client/client v1.31.4 h1:sUtf53bEPaME7GBqPAFCBoVRDdAn6brZAuVnlYfsKns=
github.com/ARM-software/embedded-development-services-client/client v1.31.4/go.mod h1:ZNNMwjBLtXKIUHed1p3EYy71CzFWeUv8C/HLgqlNvY0=
github.com/ARM-software/golang-utils/utils v1.68.0 h1:Sp92FvRhGhzfnvIKHDdLb8slrhcidBNFzB9GkpcsN3E=
github.com/ARM-software/golang-utils/utils v1.68.0/go.mod h1:pjIYW6jPUXH7/kxkZEPyzRwiux+NUG/BFJFn8zaJ/0A=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
Expand Down
5 changes: 4 additions & 1 deletion utils/job/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ type IAsynchronousJob interface {
GetSuccess() bool
// GetStatus returns the state the job is in. This is for information only and should not be relied upon as likely to change. Use flags for implementing a state machine.
GetStatus() string
// GetQueued returns whether the job is being queued and has not started just yet
GetQueued() bool
}

// IJobManager defines a manager of asynchronous jobs
type IJobManager interface {
// HasJobCompleted calls the services to determine whether the job has completed.
HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)

// HasJobStarted calls the services to determine whether the job has started.
HasJobStarted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)
// WaitForJobCompletion waits for a job to complete.
WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error)
}
18 changes: 13 additions & 5 deletions utils/job/testing.go → utils/job/jobtest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package job
package jobtest

import (
"github.com/go-faker/faker/v4"
Expand All @@ -18,6 +18,10 @@ type MockAsynchronousJob struct {
failure bool
}

func (m *MockAsynchronousJob) GetQueued() bool {
return !m.done
}

func (m *MockAsynchronousJob) FetchType() string {
return "Mock Asynchronous Job"
}
Expand All @@ -42,7 +46,7 @@ func (m *MockAsynchronousJob) GetStatus() string {
return faker.Word()
}

func newMockAsynchronousJob(done bool, failure bool) (IAsynchronousJob, error) {
func newMockAsynchronousJob(done bool, failure bool) (*MockAsynchronousJob, error) {
r, err := resourcetests.NewMockResource()
if err != nil {
return nil, err
Expand All @@ -54,14 +58,18 @@ func newMockAsynchronousJob(done bool, failure bool) (IAsynchronousJob, error) {
}, nil
}

func NewMockUndoneAsynchronousJob() (IAsynchronousJob, error) {
func NewMockUndoneAsynchronousJob() (*MockAsynchronousJob, error) {
return newMockAsynchronousJob(false, false)
}

func NewMockQueuedAsynchronousJob() (*MockAsynchronousJob, error) {
return newMockAsynchronousJob(false, false)
}

func NewMockSuccessfulAsynchronousJob() (IAsynchronousJob, error) {
func NewMockSuccessfulAsynchronousJob() (*MockAsynchronousJob, error) {
return newMockAsynchronousJob(true, false)
}

func NewMockFailedAsynchronousJob() (IAsynchronousJob, error) {
func NewMockFailedAsynchronousJob() (*MockAsynchronousJob, error) {
return newMockAsynchronousJob(true, true)
}
60 changes: 60 additions & 0 deletions utils/job/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/ARM-software/embedded-development-services-client-utils/utils/messages"
"github.com/ARM-software/golang-utils/utils/collection/pagination"
"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/logs"
"github.com/ARM-software/golang-utils/utils/parallelisation"
"github.com/ARM-software/golang-utils/utils/retry"
)

type Manager struct {
Expand Down Expand Up @@ -53,6 +55,30 @@ func (m *Manager) FetchJobMessagesFirstPage(ctx context.Context, job IAsynchrono
return
}

func (m *Manager) waitForJobToStart(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob) (err error) {
err = parallelisation.DetermineContextError(ctx)
if err != nil {
return
}

jobName, err := job.FetchName()
if err != nil {
return
}
notStartedError := fmt.Errorf("%w: job [%v] has not started", commonerrors.ErrCondition, jobName)
err = retry.RetryOnError(ctx, logs.NewPlainLogrLoggerFromLoggers(logger), retry.DefaultExponentialBackoffRetryPolicyConfiguration(), func() error {
started, subErr := m.HasJobStarted(ctx, job)
if subErr != nil {
return subErr
}
if started {
return nil
}
return notStartedError
}, fmt.Sprintf("Waiting for job [%v] to start...", jobName), notStartedError)
return
}

func (m *Manager) createMessagePaginator(ctx context.Context, job IAsynchronousJob) (paginator pagination.IStreamPaginatorAndPageFetcher, err error) {
paginator, err = m.messagesPaginatorFactory.Create(ctx, func(subCtx context.Context) (pagination.IStaticPageStream, error) {
return m.FetchJobMessagesFirstPage(subCtx, job)
Expand All @@ -74,6 +100,10 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
_ = messageLogger.Close()
}
}()
err = m.waitForJobToStart(ctx, messageLogger, job)
if err != nil {
return
}
messagePaginator, err := m.createMessagePaginator(ctx, job)
if err != nil {
return
Expand Down Expand Up @@ -119,6 +149,36 @@ func (m *Manager) checkForMessageStreamExhaustion(ctx context.Context, paginator
}
}

func (m *Manager) HasJobStarted(ctx context.Context, job IAsynchronousJob) (started bool, err error) {
err = parallelisation.DetermineContextError(ctx)
if err != nil {
return
}
if job == nil {
err = fmt.Errorf("%w: missing job", commonerrors.ErrUndefined)
return
}
jobName, err := job.FetchName()
if err != nil {
return
}
jobType := job.FetchType()
jobStatus, resp, apierr := m.fetchJobStatusFunc(ctx, jobName)
if resp != nil {
_ = resp.Body.Close()
}
err = api.CheckAPICallSuccess(ctx, fmt.Sprintf("could not fetch %v [%v]'s status", jobType, jobName), resp, apierr)
if err != nil {
return
}
if jobStatus.GetDone() {
started = true
} else {
started = !jobStatus.GetQueued()
}
return
}

func (m *Manager) HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error) {
err = parallelisation.DetermineContextError(ctx)
if err != nil {
Expand Down
31 changes: 24 additions & 7 deletions utils/job/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/ARM-software/embedded-development-services-client-utils/utils/job/jobtest"
"github.com/ARM-software/embedded-development-services-client-utils/utils/logging"
"github.com/ARM-software/embedded-development-services-client-utils/utils/messages"
pagination2 "github.com/ARM-software/embedded-development-services-client-utils/utils/pagination"
Expand All @@ -41,17 +42,22 @@ func TestManager_HasJobCompleted(t *testing.T) {
expectedError: commonerrors.ErrUndefined,
},
{
jobFunc: NewMockFailedAsynchronousJob,
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
expectCompleted: true,
expectedError: commonerrors.ErrInvalid,
},
{
jobFunc: NewMockUndoneAsynchronousJob,
jobFunc: mapFunc(jobtest.NewMockUndoneAsynchronousJob),
expectCompleted: false,
expectedError: nil,
},
{
jobFunc: NewMockSuccessfulAsynchronousJob,
jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob),
expectCompleted: false,
expectedError: nil,
},
{
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
expectCompleted: true,
expectedError: nil,
},
Expand Down Expand Up @@ -92,11 +98,11 @@ func TestManager_checkForMessageStreamExhaustion(t *testing.T) {
expectedError: commonerrors.ErrUndefined,
},
{
jobFunc: NewMockFailedAsynchronousJob,
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
expectedError: nil,
},
{
jobFunc: NewMockSuccessfulAsynchronousJob,
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
expectedError: nil,
},
}
Expand Down Expand Up @@ -132,18 +138,29 @@ func TestManager_checkForMessageStreamExhaustion(t *testing.T) {
}
}

func mapFunc(f func() (*jobtest.MockAsynchronousJob, error)) func() (IAsynchronousJob, error) {
return func() (IAsynchronousJob, error) {
job, err := f()
return job, err
}
}

func TestManager_WaitForJobCompletion(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct {
jobFunc func() (IAsynchronousJob, error)
expectedError error
}{
{
jobFunc: NewMockFailedAsynchronousJob,
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
expectedError: commonerrors.ErrInvalid,
},
{
jobFunc: NewMockSuccessfulAsynchronousJob,
jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob),
expectedError: commonerrors.ErrCondition,
},
{
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
expectedError: nil,
},
}
Expand Down
29 changes: 29 additions & 0 deletions utils/mocks/mock_job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 739260a

Please sign in to comment.