diff --git a/internal/am/delete_transfer.go b/internal/am/delete_transfer.go index 9e4a5cda8..8601b44e2 100644 --- a/internal/am/delete_transfer.go +++ b/internal/am/delete_transfer.go @@ -20,19 +20,21 @@ type DeleteTransferActivity struct { logger logr.Logger } +type DeleteTransferActivityResult struct{} + func NewDeleteTransferActivity(logger logr.Logger, client sftp.Client) *DeleteTransferActivity { return &DeleteTransferActivity{client: client, logger: logger} } -func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTransferActivityParams) error { +func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTransferActivityParams) (*DeleteTransferActivityResult, error) { a.logger.V(1).Info("Execute DeleteTransferActivity", "destination", params.Destination, ) err := a.client.Delete(ctx, params.Destination) if err != nil { - return fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err) + return nil, fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err) } - return nil + return nil, nil } diff --git a/internal/storage/activities/copy.go b/internal/storage/activities/copy.go index b7158be5f..d28905c0b 100644 --- a/internal/storage/activities/copy.go +++ b/internal/storage/activities/copy.go @@ -11,47 +11,49 @@ type CopyToPermanentLocationActivity struct { storagesvc storage.Service } +type CopyToPermanentLocationActivityResult struct{} + func NewCopyToPermanentLocationActivity(storagesvc storage.Service) *CopyToPermanentLocationActivity { return &CopyToPermanentLocationActivity{storagesvc: storagesvc} } -func (a *CopyToPermanentLocationActivity) Execute(ctx context.Context, params *storage.CopyToPermanentLocationActivityParams) error { +func (a *CopyToPermanentLocationActivity) Execute(ctx context.Context, params *storage.CopyToPermanentLocationActivityParams) (*CopyToPermanentLocationActivityResult, error) { p, err := a.storagesvc.ReadPackage(ctx, params.AIPID) if err != nil { - return err + return nil, err } reader, err := a.storagesvc.PackageReader(ctx, p) if err != nil { - return err + return nil, err } defer reader.Close() l, err := a.storagesvc.Location(ctx, params.LocationID) if err != nil { - return err + return nil, err } bucket, err := l.OpenBucket(ctx) if err != nil { - return err + return nil, err } defer bucket.Close() writer, err := bucket.NewWriter(ctx, params.AIPID.String(), nil) if err != nil { - return err + return nil, err } _, copyErr := io.Copy(writer, reader) closeErr := writer.Close() if copyErr != nil { - return copyErr + return nil, copyErr } if closeErr != nil { - return closeErr + return nil, closeErr } - return nil + return nil, nil } diff --git a/internal/storage/workflows/move_test.go b/internal/storage/workflows/move_test.go index 1af0c8706..3e40bb170 100644 --- a/internal/storage/workflows/move_test.go +++ b/internal/storage/workflows/move_test.go @@ -33,7 +33,7 @@ func TestStorageMoveWorkflow(t *testing.T) { // Worker activities env.RegisterActivityWithOptions(activities.NewCopyToPermanentLocationActivity(storagesvc).Execute, temporalsdk_activity.RegisterOptions{Name: storage.CopyToPermanentLocationActivityName}) - env.OnActivity(storage.CopyToPermanentLocationActivityName, mock.Anything, mock.Anything).Return(nil) + env.OnActivity(storage.CopyToPermanentLocationActivityName, mock.Anything, mock.Anything).Return(nil, nil) env.ExecuteWorkflow( NewStorageMoveWorkflow(storagesvc).Execute, diff --git a/internal/workflow/activities/cleanup.go b/internal/workflow/activities/cleanup.go index de8cf7c7b..6d8a0dcc9 100644 --- a/internal/workflow/activities/cleanup.go +++ b/internal/workflow/activities/cleanup.go @@ -17,14 +17,16 @@ type CleanUpActivityParams struct { FullPath string } -func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error { +type CleanUpActivityResult struct{} + +func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) (*CleanUpActivityResult, error) { if params == nil || params.FullPath == "" { - return fmt.Errorf("error processing parameters: missing or empty") + return nil, fmt.Errorf("error processing parameters: missing or empty") } if err := os.RemoveAll(params.FullPath); err != nil { - return fmt.Errorf("error removing transfer directory: %v", err) + return nil, fmt.Errorf("error removing transfer directory: %v", err) } - return nil + return nil, nil } diff --git a/internal/workflow/activities/delete_original.go b/internal/workflow/activities/delete_original.go index b6a4e607d..b23f1a9fb 100644 --- a/internal/workflow/activities/delete_original.go +++ b/internal/workflow/activities/delete_original.go @@ -10,10 +10,12 @@ type DeleteOriginalActivity struct { wsvc watcher.Service } +type DeleteOriginalActivityResult struct{} + func NewDeleteOriginalActivity(wsvc watcher.Service) *DeleteOriginalActivity { return &DeleteOriginalActivity{wsvc: wsvc} } -func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, key string) error { - return a.wsvc.Delete(ctx, watcherName, key) +func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, key string) (*DeleteOriginalActivityResult, error) { + return nil, a.wsvc.Delete(ctx, watcherName, key) } diff --git a/internal/workflow/activities/dispose_original.go b/internal/workflow/activities/dispose_original.go index fbe9f23a5..7cedd96d1 100644 --- a/internal/workflow/activities/dispose_original.go +++ b/internal/workflow/activities/dispose_original.go @@ -10,10 +10,12 @@ type DisposeOriginalActivity struct { wsvc watcher.Service } +type DisposeOriginalActivityResult struct{} + func NewDisposeOriginalActivity(wsvc watcher.Service) *DisposeOriginalActivity { return &DisposeOriginalActivity{wsvc: wsvc} } -func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, key string) error { - return a.wsvc.Dispose(ctx, watcherName, key) +func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, key string) (*DisposeOriginalActivityResult, error) { + return nil, a.wsvc.Dispose(ctx, watcherName, key) } diff --git a/internal/workflow/activities/storage.go b/internal/workflow/activities/storage.go index 801577da9..3923b0c56 100644 --- a/internal/workflow/activities/storage.go +++ b/internal/workflow/activities/storage.go @@ -16,6 +16,8 @@ type MoveToPermanentStorageActivityParams struct { LocationID uuid.UUID } +type MoveToPermanentStorageActivityResult struct{} + type MoveToPermanentStorageActivity struct { storageClient *goastorage.Client } @@ -26,7 +28,7 @@ func NewMoveToPermanentStorageActivity(storageClient *goastorage.Client) *MoveTo } } -func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *MoveToPermanentStorageActivityParams) error { +func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *MoveToPermanentStorageActivityParams) (*MoveToPermanentStorageActivityResult, error) { childCtx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() @@ -35,7 +37,7 @@ func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *Mo LocationID: params.LocationID, }) - return err + return nil, err } type PollMoveToPermanentStorageActivityParams struct { @@ -46,13 +48,15 @@ type PollMoveToPermanentStorageActivity struct { storageClient *goastorage.Client } +type PollMoveToPermanentStorageActivityResult struct{} + func NewPollMoveToPermanentStorageActivity(storageClient *goastorage.Client) *PollMoveToPermanentStorageActivity { return &PollMoveToPermanentStorageActivity{ storageClient: storageClient, } } -func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params *PollMoveToPermanentStorageActivityParams) error { +func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params *PollMoveToPermanentStorageActivityParams) (*PollMoveToPermanentStorageActivityResult, error) { var g run.Group { @@ -105,7 +109,7 @@ func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params } err := g.Run() - return err + return nil, err } type RejectPackageActivityParams struct { @@ -116,13 +120,15 @@ type RejectPackageActivity struct { storageClient *goastorage.Client } +type RejectPackageActivityResult struct{} + func NewRejectPackageActivity(storageClient *goastorage.Client) *RejectPackageActivity { return &RejectPackageActivity{ storageClient: storageClient, } } -func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPackageActivityParams) error { +func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPackageActivityParams) (*RejectPackageActivityResult, error) { childCtx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() @@ -130,5 +136,5 @@ func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPacka AipID: params.AIPID, }) - return err + return nil, err } diff --git a/internal/workflow/activities/upload.go b/internal/workflow/activities/upload.go index 2e8cab72c..481cb5712 100644 --- a/internal/workflow/activities/upload.go +++ b/internal/workflow/activities/upload.go @@ -20,13 +20,15 @@ type UploadActivity struct { storageClient *goastorage.Client } +type UploadActivityResult struct{} + func NewUploadActivity(storageClient *goastorage.Client) *UploadActivity { return &UploadActivity{ storageClient: storageClient, } } -func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityParams) error { +func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityParams) (*UploadActivityResult, error) { childCtx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() res, err := a.storageClient.Submit(childCtx, &goastorage.SubmitPayload{ @@ -34,37 +36,37 @@ func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityPara Name: params.Name, }) if err != nil { - return err + return nil, err } // Upload to MinIO using the upload pre-signed URL. { f, err := os.Open(params.AIPPath) if err != nil { - return err + return nil, err } defer f.Close() //#nosec G307 -- Errors returned by Close() here do not require specific handling. uploadReq, err := http.NewRequestWithContext(ctx, http.MethodPut, res.URL, f) if err != nil { - return nil + return nil, nil } fi, err := f.Stat() if err != nil { - return err + return nil, err } uploadReq.ContentLength = fi.Size() if err != nil { - return err + return nil, err } minioClient := &http.Client{} resp, err := minioClient.Do(uploadReq) if err != nil { - return err + return nil, err } if resp.StatusCode != http.StatusOK { - return errors.New("unexpected status code returned") + return nil, errors.New("unexpected status code returned") } } @@ -72,5 +74,5 @@ func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityPara defer cancel() err = a.storageClient.Update(childCtx, &goastorage.UpdatePayload{AipID: params.AIPID}) - return err + return nil, err } diff --git a/internal/workflow/activities/upload_test.go b/internal/workflow/activities/upload_test.go index f1455eb44..4b66f0914 100644 --- a/internal/workflow/activities/upload_test.go +++ b/internal/workflow/activities/upload_test.go @@ -128,7 +128,7 @@ func TestUploadActivity(t *testing.T) { activity := NewUploadActivity(storageClient) - err := activity.Execute(context.Background(), &UploadActivityParams{ + _, err := activity.Execute(context.Background(), &UploadActivityParams{ AIPPath: tmpDir.Join("aip.7z"), AIPID: uuid.New().String(), Name: "aip.7z", @@ -173,7 +173,7 @@ func TestUploadActivity(t *testing.T) { activity := NewUploadActivity(storageClient) - err := activity.Execute(context.Background(), &UploadActivityParams{ + _, err := activity.Execute(context.Background(), &UploadActivityParams{ AIPPath: tmpDir.Join("aip.7z"), AIPID: uuid.New().String(), Name: "aip.7z", diff --git a/internal/workflow/local_activities.go b/internal/workflow/local_activities.go index 41b470a00..b65eaebbc 100644 --- a/internal/workflow/local_activities.go +++ b/internal/workflow/local_activities.go @@ -43,7 +43,9 @@ type updatePackageLocalActivityParams struct { Status package_.Status } -func updatePackageLocalActivity(ctx context.Context, logger logr.Logger, pkgsvc package_.Service, params *updatePackageLocalActivityParams) error { +type updatePackageLocalActivityResult struct{} + +func updatePackageLocalActivity(ctx context.Context, logger logr.Logger, pkgsvc package_.Service, params *updatePackageLocalActivityParams) (*updatePackageLocalActivityResult, error) { info := temporalsdk_activity.GetInfo(ctx) err := pkgsvc.UpdateWorkflowStatus( @@ -58,22 +60,28 @@ func updatePackageLocalActivity(ctx context.Context, logger logr.Logger, pkgsvc ) if err != nil { logger.Error(err, "Error updating package") - return err + return nil, err } - return nil + return nil, nil } -func setStatusInProgressLocalActivity(ctx context.Context, pkgsvc package_.Service, pkgID uint, startedAt time.Time) error { - return pkgsvc.SetStatusInProgress(ctx, pkgID, startedAt) +type setStatusInProgressLocalActivityResult struct{} + +func setStatusInProgressLocalActivity(ctx context.Context, pkgsvc package_.Service, pkgID uint, startedAt time.Time) (*setStatusInProgressLocalActivityResult, error) { + return nil, pkgsvc.SetStatusInProgress(ctx, pkgID, startedAt) } -func setStatusLocalActivity(ctx context.Context, pkgsvc package_.Service, pkgID uint, status package_.Status) error { - return pkgsvc.SetStatus(ctx, pkgID, status) +type setStatusLocalActivityResult struct{} + +func setStatusLocalActivity(ctx context.Context, pkgsvc package_.Service, pkgID uint, status package_.Status) (*setStatusLocalActivityResult, error) { + return nil, pkgsvc.SetStatus(ctx, pkgID, status) } -func setLocationIDLocalActivity(ctx context.Context, pkgsvc package_.Service, pkgID uint, locationID uuid.UUID) error { - return pkgsvc.SetLocationID(ctx, pkgID, locationID) +type setLocationIDLocalActivityResult struct{} + +func setLocationIDLocalActivity(ctx context.Context, pkgsvc package_.Service, pkgID uint, locationID uuid.UUID) (*setLocationIDLocalActivityResult, error) { + return nil, pkgsvc.SetLocationID(ctx, pkgID, locationID) } type saveLocationMovePreservationActionLocalActivityParams struct { @@ -86,7 +94,9 @@ type saveLocationMovePreservationActionLocalActivityParams struct { CompletedAt time.Time } -func saveLocationMovePreservationActionLocalActivity(ctx context.Context, pkgsvc package_.Service, params *saveLocationMovePreservationActionLocalActivityParams) error { +type saveLocationMovePreservationActionLocalActivityResult struct{} + +func saveLocationMovePreservationActionLocalActivity(ctx context.Context, pkgsvc package_.Service, params *saveLocationMovePreservationActionLocalActivityParams) (*saveLocationMovePreservationActionLocalActivityResult, error) { paID, err := createPreservationActionLocalActivity(ctx, pkgsvc, &createPreservationActionLocalActivityParams{ WorkflowID: params.WorkflowID, Type: params.Type, @@ -96,7 +106,7 @@ func saveLocationMovePreservationActionLocalActivity(ctx context.Context, pkgsvc PackageID: params.PackageID, }) if err != nil { - return err + return nil, err } actionStatusToTaskStatus := map[package_.PreservationActionStatus]package_.PreservationTaskStatus{ @@ -116,7 +126,7 @@ func saveLocationMovePreservationActionLocalActivity(ctx context.Context, pkgsvc pt.StartedAt.Time = params.StartedAt pt.CompletedAt.Time = params.CompletedAt - return pkgsvc.CreatePreservationTask(ctx, &pt) + return nil, pkgsvc.CreatePreservationTask(ctx, &pt) } type createPreservationActionLocalActivityParams struct { @@ -145,8 +155,10 @@ func createPreservationActionLocalActivity(ctx context.Context, pkgsvc package_. return pa.ID, nil } -func setPreservationActionStatusLocalActivity(ctx context.Context, pkgsvc package_.Service, ID uint, status package_.PreservationActionStatus) error { - return pkgsvc.SetPreservationActionStatus(ctx, ID, status) +type setPreservationActionStatusLocalActivityResult struct{} + +func setPreservationActionStatusLocalActivity(ctx context.Context, pkgsvc package_.Service, ID uint, status package_.PreservationActionStatus) (*setPreservationActionStatusLocalActivityResult, error) { + return nil, pkgsvc.SetPreservationActionStatus(ctx, ID, status) } type completePreservationActionLocalActivityParams struct { @@ -155,8 +167,10 @@ type completePreservationActionLocalActivityParams struct { CompletedAt time.Time } -func completePreservationActionLocalActivity(ctx context.Context, pkgsvc package_.Service, params *completePreservationActionLocalActivityParams) error { - return pkgsvc.CompletePreservationAction(ctx, params.PreservationActionID, params.Status, params.CompletedAt) +type completePreservationActionLocalActivityResult struct{} + +func completePreservationActionLocalActivity(ctx context.Context, pkgsvc package_.Service, params *completePreservationActionLocalActivityParams) (*completePreservationActionLocalActivityResult, error) { + return nil, pkgsvc.CompletePreservationAction(ctx, params.PreservationActionID, params.Status, params.CompletedAt) } type createPreservationTaskLocalActivityParams struct { @@ -194,6 +208,8 @@ type completePreservationTaskLocalActivityParams struct { Note *string } -func completePreservationTaskLocalActivity(ctx context.Context, pkgsvc package_.Service, params *completePreservationTaskLocalActivityParams) error { - return pkgsvc.CompletePreservationTask(ctx, params.ID, params.Status, params.CompletedAt, params.Note) +type completePreservationTaskLocalActivityResult struct{} + +func completePreservationTaskLocalActivity(ctx context.Context, pkgsvc package_.Service, params *completePreservationTaskLocalActivityParams) (*completePreservationTaskLocalActivityResult, error) { + return nil, pkgsvc.CompletePreservationTask(ctx, params.ID, params.Status, params.CompletedAt, params.Note) } diff --git a/internal/workflow/move_test.go b/internal/workflow/move_test.go index 0faf9d658..d7b98b699 100644 --- a/internal/workflow/move_test.go +++ b/internal/workflow/move_test.go @@ -63,7 +63,7 @@ func (s *MoveWorkflowTestSuite) TestSuccessfulMove() { locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") // Package is set to in progress status. - s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, pkgID, package_.StatusInProgress).Return(nil) + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, pkgID, package_.StatusInProgress).Return(nil, nil) // Move operation succeeds. s.env.OnActivity( @@ -73,7 +73,7 @@ func (s *MoveWorkflowTestSuite) TestSuccessfulMove() { AIPID: AIPID, LocationID: locationID, }, - ).Return(nil) + ).Return(nil, nil) // Polling of move operation succeeds. s.env.OnActivity( @@ -82,13 +82,13 @@ func (s *MoveWorkflowTestSuite) TestSuccessfulMove() { &activities.PollMoveToPermanentStorageActivityParams{ AIPID: AIPID, }, - ).Return(nil) + ).Return(nil, nil) // Package is set back to done status. - s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, pkgID, package_.StatusDone).Return(nil) + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, pkgID, package_.StatusDone).Return(nil, nil) // Package location is set. - s.env.OnActivity(setLocationIDLocalActivity, mock.Anything, mock.Anything, pkgID, locationID).Return(nil) + s.env.OnActivity(setLocationIDLocalActivity, mock.Anything, mock.Anything, pkgID, locationID).Return(nil, nil) // Preservation action is created with successful status. s.env.OnActivity( @@ -96,7 +96,7 @@ func (s *MoveWorkflowTestSuite) TestSuccessfulMove() { mock.Anything, mock.Anything, mock.Anything, - ).Return(nil) + ).Return(nil, nil) s.env.ExecuteWorkflow( s.workflow.Execute, @@ -118,7 +118,7 @@ func (s *MoveWorkflowTestSuite) TestFailedMove() { locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") // Package is set to in progress status. - s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, pkgID, package_.StatusInProgress).Return(nil) + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, pkgID, package_.StatusInProgress).Return(nil, nil) // Move operation fails. s.env.OnActivity( @@ -128,10 +128,10 @@ func (s *MoveWorkflowTestSuite) TestFailedMove() { AIPID: AIPID, LocationID: locationID, }, - ).Return(errors.New("error moving package")) + ).Return(nil, errors.New("error moving package")) // Package is set back to done status. - s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, pkgID, package_.StatusDone).Return(nil) + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, pkgID, package_.StatusDone).Return(nil, nil) // Preservation action is created with failed status. s.env.OnActivity( @@ -139,7 +139,7 @@ func (s *MoveWorkflowTestSuite) TestFailedMove() { mock.Anything, mock.Anything, mock.Anything, - ).Return(nil) + ).Return(nil, nil) s.env.ExecuteWorkflow( s.workflow.Execute, diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 81c56f7fb..3805b9c68 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -153,7 +153,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { // Activity mocks/assertions sequence s.env.OnActivity(createPackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(pkgID, nil) - s.env.OnActivity(setStatusInProgressLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.env.OnActivity(setStatusInProgressLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) s.env.OnActivity(createPreservationActionLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(uint(0), nil) s.env.OnActivity(activities.DownloadActivityName, sessionCtx, @@ -177,20 +177,20 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { ) s.env.OnActivity(a3m.CreateAIPActivityName, mock.Anything, mock.Anything).Return(nil, nil) - s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) s.env.OnActivity(createPreservationTaskLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(uint(0), nil) - s.env.OnActivity(activities.UploadActivityName, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(setPreservationActionStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(completePreservationTaskLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(activities.MoveToPermanentStorageActivityName, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(activities.PollMoveToPermanentStorageActivityName, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(setLocationIDLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil).Once() - s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(completePreservationActionLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.env.OnActivity(activities.UploadActivityName, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(setPreservationActionStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil,nil) + s.env.OnActivity(completePreservationTaskLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(activities.MoveToPermanentStorageActivityName, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(activities.PollMoveToPermanentStorageActivityName, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(setLocationIDLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil, nil).Once() + s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(completePreservationActionLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) s.env.ExecuteWorkflow( s.workflow.Execute, @@ -226,7 +226,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { pkgsvc, &createPackageLocalActivityParams{Key: key, Status: package_.StatusQueued}, ).Return(pkgID, nil).Once() - s.env.OnActivity(setStatusInProgressLocalActivity, ctx, pkgsvc, pkgID, mock.AnythingOfType("time.Time")).Return(nil).Once() + s.env.OnActivity(setStatusInProgressLocalActivity, ctx, pkgsvc, pkgID, mock.AnythingOfType("time.Time")).Return(nil, nil).Once() s.env.OnActivity(createPreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.createPreservationActionLocalActivityParams")).Return(uint(0), nil).Once() s.env.OnActivity(activities.DownloadActivityName, sessionCtx, @@ -250,18 +250,18 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { ) s.env.OnActivity(a3m.CreateAIPActivityName, sessionCtx, mock.AnythingOfType("*a3m.CreateAIPActivityParams")).Return(nil, nil).Once() - s.env.OnActivity(updatePackageLocalActivity, ctx, logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")).Return(nil).Times(2) + s.env.OnActivity(updatePackageLocalActivity, ctx, logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")).Return(nil, nil).Times(2) s.env.OnActivity(createPreservationTaskLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.createPreservationTaskLocalActivityParams")).Return(uint(0), nil).Once() - s.env.OnActivity(activities.UploadActivityName, sessionCtx, mock.AnythingOfType("*activities.UploadActivityParams")).Return(nil).Once() - s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Never() - s.env.OnActivity(setPreservationActionStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Never() - s.env.OnActivity(completePreservationTaskLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationTaskLocalActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.MoveToPermanentStorageActivityName, sessionCtx, mock.AnythingOfType("*activities.MoveToPermanentStorageActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.PollMoveToPermanentStorageActivityName, sessionCtx, mock.AnythingOfType("*activities.PollMoveToPermanentStorageActivityParams")).Return(nil).Once() - s.env.OnActivity(setLocationIDLocalActivity, ctx, pkgsvc, pkgID, locationID).Return(nil).Once() - s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil).Once() + s.env.OnActivity(activities.UploadActivityName, sessionCtx, mock.AnythingOfType("*activities.UploadActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Never() + s.env.OnActivity(setPreservationActionStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Never() + s.env.OnActivity(completePreservationTaskLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationTaskLocalActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.MoveToPermanentStorageActivityName, sessionCtx, mock.AnythingOfType("*activities.MoveToPermanentStorageActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.PollMoveToPermanentStorageActivityName, sessionCtx, mock.AnythingOfType("*activities.PollMoveToPermanentStorageActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(setLocationIDLocalActivity, ctx, pkgsvc, pkgID, locationID).Return(nil, nil).Once() + s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil, nil).Once() s.env.ExecuteWorkflow( s.workflow.Execute, @@ -301,7 +301,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { &createPackageLocalActivityParams{Key: key, Status: package_.StatusQueued}, ).Return(pkgID, nil) - s.env.OnActivity(setStatusInProgressLocalActivity, ctx, pkgsvc, pkgID, mock.AnythingOfType("time.Time")).Return(nil) + s.env.OnActivity(setStatusInProgressLocalActivity, ctx, pkgsvc, pkgID, mock.AnythingOfType("time.Time")).Return(nil, nil) s.env.OnActivity(createPreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.createPreservationActionLocalActivityParams"), @@ -359,14 +359,14 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { s.env.OnActivity(am.DeleteTransferActivityName, sessionCtx, &am.DeleteTransferActivityParams{Destination: "transfer.zip"}, - ).Return(nil) + ).Return(nil, nil) // Post-preservation activities. - s.env.OnActivity(updatePackageLocalActivity, ctx, logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")).Return(nil).Once() - s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Never() - s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil).Once() + s.env.OnActivity(updatePackageLocalActivity, ctx, logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Never() + s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil, nil).Once() s.env.ExecuteWorkflow( s.workflow.Execute, @@ -405,7 +405,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { // Activity mocks/assertions sequence s.env.OnActivity(createPackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(pkgID, nil) - s.env.OnActivity(setStatusInProgressLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.env.OnActivity(setStatusInProgressLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) s.env.OnActivity(createPreservationActionLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(uint(0), nil) s.env.OnActivity(activities.DownloadActivityName, sessionCtx, @@ -429,17 +429,17 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { ) s.env.OnActivity(a3m.CreateAIPActivityName, mock.Anything, mock.Anything).Return(nil, nil) - s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(completePreservationTaskLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(activities.UploadActivityName, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(setPreservationActionStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(completePreservationTaskLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(activities.UploadActivityName, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(setPreservationActionStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) s.env.OnActivity(createPreservationTaskLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(uint(0), nil) - s.env.OnActivity(activities.RejectPackageActivityName, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil).Once() - s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil).Once() - s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.env.OnActivity(completePreservationActionLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.env.OnActivity(activities.RejectPackageActivityName, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil, nil).Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil, nil).Once() + s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + s.env.OnActivity(completePreservationActionLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) s.env.ExecuteWorkflow( s.workflow.Execute,