Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add result struct to activity function signature #881

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions internal/am/delete_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ type DeleteTransferActivity struct {
logger logr.Logger
}

type DeleteTransferActivityResult struct{}

func NewDeleteTransferActivity(logger logr.Logger, client sftp.Client) *DeleteTransferActivity {
return &DeleteTransferActivity{client: client, logger: logger}
}

func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTransferActivityParams) error {
func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTransferActivityParams) (*DeleteTransferActivityResult, error) {
a.logger.V(1).Info("Execute DeleteTransferActivity",
"destination", params.Destination,
)

err := a.client.Delete(ctx, params.Destination)
if err != nil {
return fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err)
return &DeleteTransferActivityResult{}, fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err)
}

return nil
return &DeleteTransferActivityResult{}, nil
}
20 changes: 11 additions & 9 deletions internal/storage/activities/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,49 @@ type CopyToPermanentLocationActivity struct {
storagesvc storage.Service
}

type CopyToPermanentLocationActivityResult struct{}

func NewCopyToPermanentLocationActivity(storagesvc storage.Service) *CopyToPermanentLocationActivity {
return &CopyToPermanentLocationActivity{storagesvc: storagesvc}
}

func (a *CopyToPermanentLocationActivity) Execute(ctx context.Context, params *storage.CopyToPermanentLocationActivityParams) error {
func (a *CopyToPermanentLocationActivity) Execute(ctx context.Context, params *storage.CopyToPermanentLocationActivityParams) (*CopyToPermanentLocationActivityResult, error) {
p, err := a.storagesvc.ReadPackage(ctx, params.AIPID)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}

reader, err := a.storagesvc.PackageReader(ctx, p)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}
defer reader.Close()

l, err := a.storagesvc.Location(ctx, params.LocationID)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}

bucket, err := l.OpenBucket(ctx)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}
defer bucket.Close()

writer, err := bucket.NewWriter(ctx, params.AIPID.String(), nil)
if err != nil {
return err
return &CopyToPermanentLocationActivityResult{}, err
}

_, copyErr := io.Copy(writer, reader)
closeErr := writer.Close()

if copyErr != nil {
return copyErr
return &CopyToPermanentLocationActivityResult{}, copyErr
}
if closeErr != nil {
return closeErr
return &CopyToPermanentLocationActivityResult{}, closeErr
}

return nil
return &CopyToPermanentLocationActivityResult{}, nil
}
2 changes: 1 addition & 1 deletion internal/storage/workflows/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestStorageMoveWorkflow(t *testing.T) {

// Worker activities
env.RegisterActivityWithOptions(activities.NewCopyToPermanentLocationActivity(storagesvc).Execute, temporalsdk_activity.RegisterOptions{Name: storage.CopyToPermanentLocationActivityName})
env.OnActivity(storage.CopyToPermanentLocationActivityName, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(storage.CopyToPermanentLocationActivityName, mock.Anything, mock.Anything).Return(nil, nil)

env.ExecuteWorkflow(
NewStorageMoveWorkflow(storagesvc).Execute,
Expand Down
10 changes: 6 additions & 4 deletions internal/workflow/activities/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ type CleanUpActivityParams struct {
FullPath string
}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error {
type CleanUpActivityResult struct{}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) (*CleanUpActivityResult, error) {
if params == nil || params.FullPath == "" {
return fmt.Errorf("error processing parameters: missing or empty")
return &CleanUpActivityResult{}, fmt.Errorf("error processing parameters: missing or empty")
}

if err := os.RemoveAll(params.FullPath); err != nil {
return fmt.Errorf("error removing transfer directory: %v", err)
return &CleanUpActivityResult{}, fmt.Errorf("error removing transfer directory: %v", err)
}

return nil
return &CleanUpActivityResult{}, nil
}
6 changes: 4 additions & 2 deletions internal/workflow/activities/delete_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ type DeleteOriginalActivity struct {
wsvc watcher.Service
}

type DeleteOriginalActivityResult struct{}

func NewDeleteOriginalActivity(wsvc watcher.Service) *DeleteOriginalActivity {
return &DeleteOriginalActivity{wsvc: wsvc}
}

func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, key string) error {
return a.wsvc.Delete(ctx, watcherName, key)
func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, key string) (*DeleteOriginalActivityResult, error) {
return &DeleteOriginalActivityResult{}, a.wsvc.Delete(ctx, watcherName, key)
}
6 changes: 4 additions & 2 deletions internal/workflow/activities/dispose_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ type DisposeOriginalActivity struct {
wsvc watcher.Service
}

type DisposeOriginalActivityResult struct{}

func NewDisposeOriginalActivity(wsvc watcher.Service) *DisposeOriginalActivity {
return &DisposeOriginalActivity{wsvc: wsvc}
}

func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, key string) error {
return a.wsvc.Dispose(ctx, watcherName, key)
func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, key string) (*DisposeOriginalActivityResult, error) {
return &DisposeOriginalActivityResult{}, a.wsvc.Dispose(ctx, watcherName, key)
}
5 changes: 3 additions & 2 deletions internal/workflow/activities/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ func (a *DownloadActivity) Execute(ctx context.Context, params *DownloadActivity

destDir, err := os.MkdirTemp("", "enduro")
if err != nil {
return nil, temporal_tools.NewNonRetryableError(fmt.Errorf("make temp dir: %v", err))
return &DownloadActivityResult{}, temporal_tools.NewNonRetryableError(fmt.Errorf("make temp dir: %v", err))
}

dest := filepath.Clean(filepath.Join(destDir, params.Key))

ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(DownloadActivityName).Start(ctx, "download")
if err := a.wsvc.Download(ctx, dest, params.WatcherName, params.Key); err != nil {

span.RecordError(err)
span.End()
return nil, temporal_tools.NewNonRetryableError(fmt.Errorf("download: %v", err))
return &DownloadActivityResult{}, temporal_tools.NewNonRetryableError(fmt.Errorf("download: %v", err))
}
span.End()

Expand Down
18 changes: 12 additions & 6 deletions internal/workflow/activities/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type MoveToPermanentStorageActivityParams struct {
LocationID uuid.UUID
}

type MoveToPermanentStorageActivityResult struct{}

type MoveToPermanentStorageActivity struct {
storageClient *goastorage.Client
}
Expand All @@ -26,7 +28,7 @@ func NewMoveToPermanentStorageActivity(storageClient *goastorage.Client) *MoveTo
}
}

func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *MoveToPermanentStorageActivityParams) error {
func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *MoveToPermanentStorageActivityParams) (*MoveToPermanentStorageActivityResult, error) {
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

Expand All @@ -35,7 +37,7 @@ func (a *MoveToPermanentStorageActivity) Execute(ctx context.Context, params *Mo
LocationID: params.LocationID,
})

return err
return &MoveToPermanentStorageActivityResult{}, err
}

type PollMoveToPermanentStorageActivityParams struct {
Expand All @@ -46,13 +48,15 @@ type PollMoveToPermanentStorageActivity struct {
storageClient *goastorage.Client
}

type PollMoveToPermanentStorageActivityResult struct{}

func NewPollMoveToPermanentStorageActivity(storageClient *goastorage.Client) *PollMoveToPermanentStorageActivity {
return &PollMoveToPermanentStorageActivity{
storageClient: storageClient,
}
}

func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params *PollMoveToPermanentStorageActivityParams) error {
func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params *PollMoveToPermanentStorageActivityParams) (*PollMoveToPermanentStorageActivityResult, error) {
var g run.Group

{
Expand Down Expand Up @@ -105,7 +109,7 @@ func (a *PollMoveToPermanentStorageActivity) Execute(ctx context.Context, params
}

err := g.Run()
return err
return &PollMoveToPermanentStorageActivityResult{}, err
}

type RejectPackageActivityParams struct {
Expand All @@ -116,19 +120,21 @@ type RejectPackageActivity struct {
storageClient *goastorage.Client
}

type RejectPackageActivityResult struct{}

func NewRejectPackageActivity(storageClient *goastorage.Client) *RejectPackageActivity {
return &RejectPackageActivity{
storageClient: storageClient,
}
}

func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPackageActivityParams) error {
func (a *RejectPackageActivity) Execute(ctx context.Context, params *RejectPackageActivityParams) (*RejectPackageActivityResult, error) {
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

err := a.storageClient.Reject(childCtx, &goastorage.RejectPayload{
AipID: params.AIPID,
})

return err
return &RejectPackageActivityResult{}, err
}
20 changes: 11 additions & 9 deletions internal/workflow/activities/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,57 +20,59 @@ type UploadActivity struct {
storageClient *goastorage.Client
}

type UploadActivityResult struct{}

func NewUploadActivity(storageClient *goastorage.Client) *UploadActivity {
return &UploadActivity{
storageClient: storageClient,
}
}

func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityParams) error {
func (a *UploadActivity) Execute(ctx context.Context, params *UploadActivityParams) (*UploadActivityResult, error) {
childCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
res, err := a.storageClient.Submit(childCtx, &goastorage.SubmitPayload{
AipID: params.AIPID,
Name: params.Name,
})
if err != nil {
return err
return &UploadActivityResult{}, err
}

// Upload to MinIO using the upload pre-signed URL.
{
f, err := os.Open(params.AIPPath)
if err != nil {
return err
return &UploadActivityResult{}, err
}
defer f.Close() //#nosec G307 -- Errors returned by Close() here do not require specific handling.

uploadReq, err := http.NewRequestWithContext(ctx, http.MethodPut, res.URL, f)
if err != nil {
return nil
return &UploadActivityResult{}, nil
}
fi, err := f.Stat()
if err != nil {
return err
return &UploadActivityResult{}, err
}
uploadReq.ContentLength = fi.Size()
if err != nil {
return err
return &UploadActivityResult{}, err
}

minioClient := &http.Client{}
resp, err := minioClient.Do(uploadReq)
if err != nil {
return err
return &UploadActivityResult{}, err
}
if resp.StatusCode != http.StatusOK {
return errors.New("unexpected status code returned")
return &UploadActivityResult{}, errors.New("unexpected status code returned")
}
}

childCtx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()
err = a.storageClient.Update(childCtx, &goastorage.UpdatePayload{AipID: params.AIPID})

return err
return &UploadActivityResult{}, err
}
4 changes: 2 additions & 2 deletions internal/workflow/activities/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestUploadActivity(t *testing.T) {

activity := NewUploadActivity(storageClient)

err := activity.Execute(context.Background(), &UploadActivityParams{
_, err := activity.Execute(context.Background(), &UploadActivityParams{
AIPPath: tmpDir.Join("aip.7z"),
AIPID: uuid.New().String(),
Name: "aip.7z",
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestUploadActivity(t *testing.T) {

activity := NewUploadActivity(storageClient)

err := activity.Execute(context.Background(), &UploadActivityParams{
_, err := activity.Execute(context.Background(), &UploadActivityParams{
AIPPath: tmpDir.Join("aip.7z"),
AIPID: uuid.New().String(),
Name: "aip.7z",
Expand Down
6 changes: 3 additions & 3 deletions internal/workflow/activities/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (a *zipActivity) Execute(ctx context.Context, params *ZipActivityParams) (*
)

if params.SourceDir == "" {
return nil, fmt.Errorf("%s: missing source dir", ZipActivityName)
return &ZipActivityResult{}, fmt.Errorf("%s: missing source dir", ZipActivityName)
}

dest := params.DestPath
Expand All @@ -51,7 +51,7 @@ func (a *zipActivity) Execute(ctx context.Context, params *ZipActivityParams) (*

w, err := os.Create(dest) // #nosec G304 -- trusted path
if err != nil {
return nil, fmt.Errorf("%s: create: %v", ZipActivityName, err)
return &ZipActivityResult{}, fmt.Errorf("%s: create: %v", ZipActivityName, err)
}
defer w.Close()

Expand Down Expand Up @@ -91,7 +91,7 @@ func (a *zipActivity) Execute(ctx context.Context, params *ZipActivityParams) (*
return nil
})
if err != nil {
return nil, fmt.Errorf("%s: add files: %v", ZipActivityName, err)
return &ZipActivityResult{}, fmt.Errorf("%s: add files: %v", ZipActivityName, err)
}

return &ZipActivityResult{Path: dest}, nil
Expand Down
Loading
Loading