Skip to content

Commit

Permalink
Add result struct to activity returns
Browse files Browse the repository at this point in the history
- Change tests so that they expect empty structs and errors as returns.

[skip codecov]
  • Loading branch information
Diogenesoftoronto committed Mar 13, 2024
1 parent cbd1a6a commit cf21f7d
Show file tree
Hide file tree
Showing 15 changed files with 155 additions and 120 deletions.
8 changes: 5 additions & 3 deletions internal/am/delete_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ type DeleteTransferActivity struct {
logger logr.Logger
}

type DeleteTransferActivityResult struct{}

func NewDeleteTransferActivity(logger logr.Logger, client sftp.Client) *DeleteTransferActivity {
return &DeleteTransferActivity{client: client, logger: logger}
}

func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTransferActivityParams) error {
func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTransferActivityParams) (*DeleteTransferActivityResult, error) {
a.logger.V(1).Info("Execute DeleteTransferActivity",
"destination", params.Destination,
)

err := a.client.Delete(ctx, params.Destination)
if err != nil {
return fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err)
return &DeleteTransferActivityResult{}, fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err)
}

return nil
return &DeleteTransferActivityResult{}, nil
}
20 changes: 11 additions & 9 deletions internal/storage/activities/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,49 @@ type CopyToPermanentLocationActivity struct {
storagesvc storage.Service
}

type CopyToPermanentLocationActivityResult struct{}

func NewCopyToPermanentLocationActivity(storagesvc storage.Service) *CopyToPermanentLocationActivity {
return &CopyToPermanentLocationActivity{storagesvc: storagesvc}
}

func (a *CopyToPermanentLocationActivity) Execute(ctx context.Context, params *storage.CopyToPermanentLocationActivityParams) error {
func (a *CopyToPermanentLocationActivity) Execute(ctx context.Context, params *storage.CopyToPermanentLocationActivityParams) (*CopyToPermanentLocationActivityResult, error) {
p, err := a.storagesvc.ReadPackage(ctx, params.AIPID)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}

reader, err := a.storagesvc.PackageReader(ctx, p)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}
defer reader.Close()

l, err := a.storagesvc.Location(ctx, params.LocationID)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}

bucket, err := l.OpenBucket(ctx)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}
defer bucket.Close()

writer, err := bucket.NewWriter(ctx, params.AIPID.String(), nil)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}

_, copyErr := io.Copy(writer, reader)
closeErr := writer.Close()

if copyErr != nil {
return copyErr
return &CopyToPermanentLocationActivityResult{}, copyErr
}
if closeErr != nil {
return closeErr
return &CopyToPermanentLocationActivityResult{}, closeErr
}

return nil
return &CopyToPermanentLocationActivityResult{}, nil
}
2 changes: 1 addition & 1 deletion internal/storage/workflows/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestStorageMoveWorkflow(t *testing.T) {

// Worker activities
env.RegisterActivityWithOptions(activities.NewCopyToPermanentLocationActivity(storagesvc).Execute, temporalsdk_activity.RegisterOptions{Name: storage.CopyToPermanentLocationActivityName})
env.OnActivity(storage.CopyToPermanentLocationActivityName, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(storage.CopyToPermanentLocationActivityName, mock.Anything, mock.Anything).Return(nil, nil)

env.ExecuteWorkflow(
NewStorageMoveWorkflow(storagesvc).Execute,
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara
if params.TransferDir == "" {
params.TransferDir, err = os.MkdirTemp("", "*-enduro-transfer")
if err != nil {
return nil, err
return &BundleActivityResult{}, err
}
}

Expand All @@ -96,24 +96,24 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara
}
}
if err != nil {
return nil, temporal_tools.NewNonRetryableError(err)
return &BundleActivityResult{}, temporal_tools.NewNonRetryableError(err)
}

err = unbag(res.FullPath)
if err != nil {
return nil, temporal_tools.NewNonRetryableError(err)
return &BundleActivityResult{}, temporal_tools.NewNonRetryableError(err)
}

res.RelPath, err = filepath.Rel(params.TransferDir, res.FullPath)
if err != nil {
return nil, temporal_tools.NewNonRetryableError(fmt.Errorf(
return &BundleActivityResult{}, temporal_tools.NewNonRetryableError(fmt.Errorf(
"error calculating relative path to transfer (base=%q, target=%q): %v",
params.TransferDir, res.FullPath, err,
))
}

if err = setPermissions(res.FullPath); err != nil {
return nil, temporal_tools.NewNonRetryableError(
return &BundleActivityResult{}, temporal_tools.NewNonRetryableError(
fmt.Errorf("set permissions: %v", err),
)
}
Expand Down
10 changes: 6 additions & 4 deletions internal/workflow/activities/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ type CleanUpActivityParams struct {
FullPath string
}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error {
type CleanUpActivityResult struct{}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) (*CleanUpActivityResult, error) {
if params == nil || params.FullPath == "" {
return fmt.Errorf("error processing parameters: missing or empty")
return &CleanUpActivityResult{}, fmt.Errorf("error processing parameters: missing or empty")
}

if err := os.RemoveAll(params.FullPath); err != nil {
return fmt.Errorf("error removing transfer directory: %v", err)
return &CleanUpActivityResult{}, fmt.Errorf("error removing transfer directory: %v", err)
}

return nil
return &CleanUpActivityResult{}, nil
}
6 changes: 4 additions & 2 deletions internal/workflow/activities/delete_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ type DeleteOriginalActivity struct {
wsvc watcher.Service
}

type DeleteOriginalActivityResult struct{}

func NewDeleteOriginalActivity(wsvc watcher.Service) *DeleteOriginalActivity {
return &DeleteOriginalActivity{wsvc: wsvc}
}

func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, key string) error {
return a.wsvc.Delete(ctx, watcherName, key)
func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, key string) (*DeleteOriginalActivityResult, error) {
return &DeleteOriginalActivityResult{}, a.wsvc.Delete(ctx, watcherName, key)
}
6 changes: 4 additions & 2 deletions internal/workflow/activities/dispose_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ type DisposeOriginalActivity struct {
wsvc watcher.Service
}

type DisposeOriginalActivityResult struct{}

func NewDisposeOriginalActivity(wsvc watcher.Service) *DisposeOriginalActivity {
return &DisposeOriginalActivity{wsvc: wsvc}
}

func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, key string) error {
return a.wsvc.Dispose(ctx, watcherName, key)
func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, key string) (*DisposeOriginalActivityResult, error) {
return &DisposeOriginalActivityResult{}, a.wsvc.Dispose(ctx, watcherName, key)
}
5 changes: 3 additions & 2 deletions internal/workflow/activities/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ func (a *DownloadActivity) Execute(ctx context.Context, params *DownloadActivity

destDir, err := os.MkdirTemp("", "enduro")
if err != nil {
return nil, temporal_tools.NewNonRetryableError(fmt.Errorf("make temp dir: %v", err))
return &DownloadActivityResult{}, temporal_tools.NewNonRetryableError(fmt.Errorf("make temp dir: %v", err))
}

dest := filepath.Clean(filepath.Join(destDir, params.Key))

ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(DownloadActivityName).Start(ctx, "download")
if err := a.wsvc.Download(ctx, dest, params.WatcherName, params.Key); err != nil {

span.RecordError(err)
span.End()
return nil, temporal_tools.NewNonRetryableError(fmt.Errorf("download: %v", err))
return &DownloadActivityResult{}, temporal_tools.NewNonRetryableError(fmt.Errorf("download: %v", err))
}
span.End()

Expand Down
18 changes: 12 additions & 6 deletions internal/workflow/activities/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type MoveToPermanentStorageActivityParams struct {
LocationID uuid.UUID
}

type MoveToPermanentStorageActivityResult struct{}

type MoveToPermanentStorageActivity struct {
storageClient *goastorage.Client
}
Expand All @@ -26,7 +28,7 @@ func NewMoveToPermanentStorageActivity(storageClient *goastorage.Client) *MoveTo
}
}

func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *MoveToPermanentStorageActivityParams) error {
func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *MoveToPermanentStorageActivityParams) (*MoveToPermanentStorageActivityResult, error) {
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

Expand All @@ -35,7 +37,7 @@ func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *Mo
LocationID: params.LocationID,
})

return err
return &MoveToPermanentStorageActivityResult{}, err
}

type PollMoveToPermanentStorageActivityParams struct {
Expand All @@ -46,13 +48,15 @@ type PollMoveToPermanentStorageActivity struct {
storageClient *goastorage.Client
}

type PollMoveToPermanentStorageActivityResult struct{}

func NewPollMoveToPermanentStorageActivity(storageClient *goastorage.Client) *PollMoveToPermanentStorageActivity {
return &PollMoveToPermanentStorageActivity{
storageClient: storageClient,
}
}

func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params *PollMoveToPermanentStorageActivityParams) error {
func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params *PollMoveToPermanentStorageActivityParams) (*PollMoveToPermanentStorageActivityResult, error) {
var g run.Group

{
Expand Down Expand Up @@ -105,7 +109,7 @@ func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params
}

err := g.Run()
return err
return &PollMoveToPermanentStorageActivityResult{}, err
}

type RejectPackageActivityParams struct {
Expand All @@ -116,19 +120,21 @@ type RejectPackageActivity struct {
storageClient *goastorage.Client
}

type RejectPackageActivityResult struct{}

func NewRejectPackageActivity(storageClient *goastorage.Client) *RejectPackageActivity {
return &RejectPackageActivity{
storageClient: storageClient,
}
}

func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPackageActivityParams) error {
func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPackageActivityParams) (*RejectPackageActivityResult, error) {
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

err := a.storageClient.Reject(childCtx, &goastorage.RejectPayload{
AipID: params.AIPID,
})

return err
return &RejectPackageActivityResult{}, err
}
20 changes: 11 additions & 9 deletions internal/workflow/activities/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,57 +20,59 @@ type UploadActivity struct {
storageClient *goastorage.Client
}

type UploadActivityResult struct{}

func NewUploadActivity(storageClient *goastorage.Client) *UploadActivity {
return &UploadActivity{
storageClient: storageClient,
}
}

func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityParams) error {
func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityParams) (*UploadActivityResult, error) {
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
res, err := a.storageClient.Submit(childCtx, &goastorage.SubmitPayload{
AipID: params.AIPID,
Name: params.Name,
})
if err != nil {
return err
return &UploadActivityResult{}, err
}

// Upload to MinIO using the upload pre-signed URL.
{
f, err := os.Open(params.AIPPath)
if err != nil {
return err
return &UploadActivityResult{}, err
}
defer f.Close() //#nosec G307 -- Errors returned by Close() here do not require specific handling.

uploadReq, err := http.NewRequestWithContext(ctx, http.MethodPut, res.URL, f)
if err != nil {
return nil
return &UploadActivityResult{}, nil
}
fi, err := f.Stat()
if err != nil {
return err
return &UploadActivityResult{}, err
}
uploadReq.ContentLength = fi.Size()
if err != nil {
return err
return &UploadActivityResult{}, err
}

minioClient := &http.Client{}
resp, err := minioClient.Do(uploadReq)
if err != nil {
return err
return &UploadActivityResult{}, err
}
if resp.StatusCode != http.StatusOK {
return errors.New("unexpected status code returned")
return &UploadActivityResult{}, errors.New("unexpected status code returned")
}
}

childCtx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()
err = a.storageClient.Update(childCtx, &goastorage.UpdatePayload{AipID: params.AIPID})

return err
return &UploadActivityResult{}, err
}
4 changes: 2 additions & 2 deletions internal/workflow/activities/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestUploadActivity(t *testing.T) {

activity := NewUploadActivity(storageClient)

err := activity.Execute(context.Background(), &UploadActivityParams{
_, err := activity.Execute(context.Background(), &UploadActivityParams{
AIPPath: tmpDir.Join("aip.7z"),
AIPID: uuid.New().String(),
Name: "aip.7z",
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestUploadActivity(t *testing.T) {

activity := NewUploadActivity(storageClient)

err := activity.Execute(context.Background(), &UploadActivityParams{
_, err := activity.Execute(context.Background(), &UploadActivityParams{
AIPPath: tmpDir.Join("aip.7z"),
AIPID: uuid.New().String(),
Name: "aip.7z",
Expand Down
6 changes: 3 additions & 3 deletions internal/workflow/activities/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (a *zipActivity) Execute(ctx context.Context, params *ZipActivityParams) (*
)

if params.SourceDir == "" {
return nil, fmt.Errorf("%s: missing source dir", ZipActivityName)
return &ZipActivityResult{}, fmt.Errorf("%s: missing source dir", ZipActivityName)
}

dest := params.DestPath
Expand All @@ -51,7 +51,7 @@ func (a *zipActivity) Execute(ctx context.Context, params *ZipActivityParams) (*

w, err := os.Create(dest) // #nosec G304 -- trusted path
if err != nil {
return nil, fmt.Errorf("%s: create: %v", ZipActivityName, err)
return &ZipActivityResult{}, fmt.Errorf("%s: create: %v", ZipActivityName, err)
}
defer w.Close()

Expand Down Expand Up @@ -91,7 +91,7 @@ func (a *zipActivity) Execute(ctx context.Context, params *ZipActivityParams) (*
return nil
})
if err != nil {
return nil, fmt.Errorf("%s: add files: %v", ZipActivityName, err)
return &ZipActivityResult{}, fmt.Errorf("%s: add files: %v", ZipActivityName, err)
}

return &ZipActivityResult{Path: dest}, nil
Expand Down
Loading

0 comments on commit cf21f7d

Please sign in to comment.