Skip to content

Commit

Permalink
Add result struct to activity returns
Browse files Browse the repository at this point in the history
- Change tests so that they expect two nil returns instead of one.
  • Loading branch information
Diogenesoftoronto committed Mar 8, 2024
1 parent 071e45a commit d6106f0
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 110 deletions.
8 changes: 5 additions & 3 deletions internal/am/delete_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ type DeleteTransferActivity struct {
logger logr.Logger
}

type DeleteTransferActivityResult struct{}

func NewDeleteTransferActivity(logger logr.Logger, client sftp.Client) *DeleteTransferActivity {
return &DeleteTransferActivity{client: client, logger: logger}
}

func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTransferActivityParams) error {
func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTransferActivityParams) (*DeleteTransferActivityResult, error) {
a.logger.V(1).Info("Execute DeleteTransferActivity",
"destination", params.Destination,
)

err := a.client.Delete(ctx, params.Destination)
if err != nil {
return fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err)
return nil, fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err)
}

return nil
return nil, nil
}
20 changes: 11 additions & 9 deletions internal/storage/activities/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,49 @@ type CopyToPermanentLocationActivity struct {
storagesvc storage.Service
}

type CopyToPermanentLocationActivityResult struct{}

func NewCopyToPermanentLocationActivity(storagesvc storage.Service) *CopyToPermanentLocationActivity {
return &CopyToPermanentLocationActivity{storagesvc: storagesvc}
}

func (a *CopyToPermanentLocationActivity) Execute(ctx context.Context, params *storage.CopyToPermanentLocationActivityParams) error {
func (a *CopyToPermanentLocationActivity) Execute(ctx context.Context, params *storage.CopyToPermanentLocationActivityParams) (*CopyToPermanentLocationActivityResult, error) {

Check warning on line 20 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L20

Added line #L20 was not covered by tests
p, err := a.storagesvc.ReadPackage(ctx, params.AIPID)
if err != nil {
return err
return nil, err

Check warning on line 23 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L23

Added line #L23 was not covered by tests
}

reader, err := a.storagesvc.PackageReader(ctx, p)
if err != nil {
return err
return nil, err

Check warning on line 28 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L28

Added line #L28 was not covered by tests
}
defer reader.Close()

l, err := a.storagesvc.Location(ctx, params.LocationID)
if err != nil {
return err
return nil, err

Check warning on line 34 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L34

Added line #L34 was not covered by tests
}

bucket, err := l.OpenBucket(ctx)
if err != nil {
return err
return nil, err

Check warning on line 39 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L39

Added line #L39 was not covered by tests
}
defer bucket.Close()

writer, err := bucket.NewWriter(ctx, params.AIPID.String(), nil)
if err != nil {
return err
return nil, err

Check warning on line 45 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L45

Added line #L45 was not covered by tests
}

_, copyErr := io.Copy(writer, reader)
closeErr := writer.Close()

if copyErr != nil {
return copyErr
return nil, copyErr

Check warning on line 52 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L52

Added line #L52 was not covered by tests
}
if closeErr != nil {
return closeErr
return nil, closeErr

Check warning on line 55 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L55

Added line #L55 was not covered by tests
}

return nil
return nil, nil

Check warning on line 58 in internal/storage/activities/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/activities/copy.go#L58

Added line #L58 was not covered by tests
}
2 changes: 1 addition & 1 deletion internal/storage/workflows/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestStorageMoveWorkflow(t *testing.T) {

// Worker activities
env.RegisterActivityWithOptions(activities.NewCopyToPermanentLocationActivity(storagesvc).Execute, temporalsdk_activity.RegisterOptions{Name: storage.CopyToPermanentLocationActivityName})
env.OnActivity(storage.CopyToPermanentLocationActivityName, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(storage.CopyToPermanentLocationActivityName, mock.Anything, mock.Anything).Return(nil, nil)

env.ExecuteWorkflow(
NewStorageMoveWorkflow(storagesvc).Execute,
Expand Down
10 changes: 6 additions & 4 deletions internal/workflow/activities/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ type CleanUpActivityParams struct {
FullPath string
}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error {
type CleanUpActivityResult struct{}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) (*CleanUpActivityResult, error) {

Check warning on line 22 in internal/workflow/activities/cleanup.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/cleanup.go#L22

Added line #L22 was not covered by tests
if params == nil || params.FullPath == "" {
return fmt.Errorf("error processing parameters: missing or empty")
return nil, fmt.Errorf("error processing parameters: missing or empty")

Check warning on line 24 in internal/workflow/activities/cleanup.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/cleanup.go#L24

Added line #L24 was not covered by tests
}

if err := os.RemoveAll(params.FullPath); err != nil {
return fmt.Errorf("error removing transfer directory: %v", err)
return nil, fmt.Errorf("error removing transfer directory: %v", err)

Check warning on line 28 in internal/workflow/activities/cleanup.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/cleanup.go#L28

Added line #L28 was not covered by tests
}

return nil
return nil, nil

Check warning on line 31 in internal/workflow/activities/cleanup.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/cleanup.go#L31

Added line #L31 was not covered by tests
}
6 changes: 4 additions & 2 deletions internal/workflow/activities/delete_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ type DeleteOriginalActivity struct {
wsvc watcher.Service
}

type DeleteOriginalActivityResult struct{}

func NewDeleteOriginalActivity(wsvc watcher.Service) *DeleteOriginalActivity {
return &DeleteOriginalActivity{wsvc: wsvc}
}

func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, key string) error {
return a.wsvc.Delete(ctx, watcherName, key)
func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, key string) (*DeleteOriginalActivityResult, error) {
return nil, a.wsvc.Delete(ctx, watcherName, key)

Check warning on line 20 in internal/workflow/activities/delete_original.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/delete_original.go#L19-L20

Added lines #L19 - L20 were not covered by tests
}
6 changes: 4 additions & 2 deletions internal/workflow/activities/dispose_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ type DisposeOriginalActivity struct {
wsvc watcher.Service
}

type DisposeOriginalActivityResult struct{}

func NewDisposeOriginalActivity(wsvc watcher.Service) *DisposeOriginalActivity {
return &DisposeOriginalActivity{wsvc: wsvc}
}

func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, key string) error {
return a.wsvc.Dispose(ctx, watcherName, key)
func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, key string) (*DisposeOriginalActivityResult, error) {
return nil, a.wsvc.Dispose(ctx, watcherName, key)

Check warning on line 20 in internal/workflow/activities/dispose_original.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/dispose_original.go#L19-L20

Added lines #L19 - L20 were not covered by tests
}
18 changes: 12 additions & 6 deletions internal/workflow/activities/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type MoveToPermanentStorageActivityParams struct {
LocationID uuid.UUID
}

type MoveToPermanentStorageActivityResult struct{}

type MoveToPermanentStorageActivity struct {
storageClient *goastorage.Client
}
Expand All @@ -26,7 +28,7 @@ func NewMoveToPermanentStorageActivity(storageClient *goastorage.Client) *MoveTo
}
}

func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *MoveToPermanentStorageActivityParams) error {
func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *MoveToPermanentStorageActivityParams) (*MoveToPermanentStorageActivityResult, error) {

Check warning on line 31 in internal/workflow/activities/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/storage.go#L31

Added line #L31 was not covered by tests
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

Expand All @@ -35,7 +37,7 @@ func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *Mo
LocationID: params.LocationID,
})

return err
return nil, err

Check warning on line 40 in internal/workflow/activities/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/storage.go#L40

Added line #L40 was not covered by tests
}

type PollMoveToPermanentStorageActivityParams struct {
Expand All @@ -46,13 +48,15 @@ type PollMoveToPermanentStorageActivity struct {
storageClient *goastorage.Client
}

type PollMoveToPermanentStorageActivityResult struct{}

func NewPollMoveToPermanentStorageActivity(storageClient *goastorage.Client) *PollMoveToPermanentStorageActivity {
return &PollMoveToPermanentStorageActivity{
storageClient: storageClient,
}
}

func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params *PollMoveToPermanentStorageActivityParams) error {
func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params *PollMoveToPermanentStorageActivityParams) (*PollMoveToPermanentStorageActivityResult, error) {

Check warning on line 59 in internal/workflow/activities/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/storage.go#L59

Added line #L59 was not covered by tests
var g run.Group

{
Expand Down Expand Up @@ -105,7 +109,7 @@ func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params
}

err := g.Run()
return err
return nil, err

Check warning on line 112 in internal/workflow/activities/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/storage.go#L112

Added line #L112 was not covered by tests
}

type RejectPackageActivityParams struct {
Expand All @@ -116,19 +120,21 @@ type RejectPackageActivity struct {
storageClient *goastorage.Client
}

type RejectPackageActivityResult struct{}

func NewRejectPackageActivity(storageClient *goastorage.Client) *RejectPackageActivity {
return &RejectPackageActivity{
storageClient: storageClient,
}
}

func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPackageActivityParams) error {
func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPackageActivityParams) (*RejectPackageActivityResult, error) {

Check warning on line 131 in internal/workflow/activities/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/storage.go#L131

Added line #L131 was not covered by tests
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

err := a.storageClient.Reject(childCtx, &goastorage.RejectPayload{
AipID: params.AIPID,
})

return err
return nil, err

Check warning on line 139 in internal/workflow/activities/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/storage.go#L139

Added line #L139 was not covered by tests
}
20 changes: 11 additions & 9 deletions internal/workflow/activities/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,57 +20,59 @@ type UploadActivity struct {
storageClient *goastorage.Client
}

type UploadActivityResult struct{}

func NewUploadActivity(storageClient *goastorage.Client) *UploadActivity {
return &UploadActivity{
storageClient: storageClient,
}
}

func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityParams) error {
func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityParams) (*UploadActivityResult, error) {
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
res, err := a.storageClient.Submit(childCtx, &goastorage.SubmitPayload{
AipID: params.AIPID,
Name: params.Name,
})
if err != nil {
return err
return nil, err

Check warning on line 39 in internal/workflow/activities/upload.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/upload.go#L39

Added line #L39 was not covered by tests
}

// Upload to MinIO using the upload pre-signed URL.
{
f, err := os.Open(params.AIPPath)
if err != nil {
return err
return nil, err

Check warning on line 46 in internal/workflow/activities/upload.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/upload.go#L46

Added line #L46 was not covered by tests
}
defer f.Close() //#nosec G307 -- Errors returned by Close() here do not require specific handling.

uploadReq, err := http.NewRequestWithContext(ctx, http.MethodPut, res.URL, f)
if err != nil {
return nil
return nil, nil

Check warning on line 52 in internal/workflow/activities/upload.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/upload.go#L52

Added line #L52 was not covered by tests
}
fi, err := f.Stat()
if err != nil {
return err
return nil, err

Check warning on line 56 in internal/workflow/activities/upload.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/upload.go#L56

Added line #L56 was not covered by tests
}
uploadReq.ContentLength = fi.Size()
if err != nil {
return err
return nil, err

Check warning on line 60 in internal/workflow/activities/upload.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/upload.go#L60

Added line #L60 was not covered by tests
}

minioClient := &http.Client{}
resp, err := minioClient.Do(uploadReq)
if err != nil {
return err
return nil, err

Check warning on line 66 in internal/workflow/activities/upload.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/upload.go#L66

Added line #L66 was not covered by tests
}
if resp.StatusCode != http.StatusOK {
return errors.New("unexpected status code returned")
return nil, errors.New("unexpected status code returned")

Check warning on line 69 in internal/workflow/activities/upload.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/upload.go#L69

Added line #L69 was not covered by tests
}
}

childCtx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()
err = a.storageClient.Update(childCtx, &goastorage.UpdatePayload{AipID: params.AIPID})

return err
return nil, err
}
4 changes: 2 additions & 2 deletions internal/workflow/activities/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestUploadActivity(t *testing.T) {

activity := NewUploadActivity(storageClient)

err := activity.Execute(context.Background(), &UploadActivityParams{
_, err := activity.Execute(context.Background(), &UploadActivityParams{
AIPPath: tmpDir.Join("aip.7z"),
AIPID: uuid.New().String(),
Name: "aip.7z",
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestUploadActivity(t *testing.T) {

activity := NewUploadActivity(storageClient)

err := activity.Execute(context.Background(), &UploadActivityParams{
_, err := activity.Execute(context.Background(), &UploadActivityParams{
AIPPath: tmpDir.Join("aip.7z"),
AIPID: uuid.New().String(),
Name: "aip.7z",
Expand Down
Loading

0 comments on commit d6106f0

Please sign in to comment.