From 668d5642becfe00656bf02171f9fee91c3a864d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Raddaoui=20Mar=C3=ADn?= Date: Fri, 18 Oct 2024 02:42:04 +0200 Subject: [PATCH] Move send to failed to its own method and struct [skip-codecov] --- internal/workflow/processing.go | 104 +++++++++++++++++++------------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 0f4bef975..c3d9dd742 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -112,12 +112,17 @@ type TransferInfo struct { GlobalTaskQueue string PreservationTaskQueue string - // Send to failed variables used to keep track of the SIP - // location, if it requires zipping (a3m PIP) and what activity - // needs to be called to be uploaded to the expected bucket. - SendToFailedPath string - SendToFailedActivityName string - SendToFailedNeedsZipping bool + // Send to failed information. + SendToFailed SendToFailed +} + +// Send to failed variables used to keep track of the SIP/PIP +// location, if it requires zipping (a3m PIP) and what activity +// needs to be called to be uploaded to the expected bucket. +type SendToFailed struct { + Path string + ActivityName string + NeedsZipping bool } func (t *TransferInfo) Name() string { @@ -338,36 +343,8 @@ func (w *ProcessingWorkflow) SessionHandler( ) (e error) { var cleanup cleanupRegistry defer func() { - if e != nil && tinfo.SendToFailedActivityName != "" && tinfo.SendToFailedPath != "" { - if tinfo.SendToFailedNeedsZipping { - var zipResult archivezip.Result - activityOpts := withActivityOptsForLocalAction(sessCtx) - err := temporalsdk_workflow.ExecuteActivity( - activityOpts, - archivezip.Name, - &archivezip.Params{SourceDir: tinfo.SendToFailedPath}, - ).Get(activityOpts, &zipResult) - if err != nil { - e = errors.Join(e, err) - return - } - tinfo.SendToFailedPath = zipResult.Path - } - - var sendToFailedResult bucketupload.Result - activityOpts := withActivityOptsForLongLivedRequest(sessCtx) - err := temporalsdk_workflow.ExecuteActivity( - activityOpts, - tinfo.SendToFailedActivityName, - &bucketupload.Params{ - Path: tinfo.SendToFailedPath, - Key: fmt.Sprintf("Failed_%s", tinfo.req.Key), - BufferSize: 100_000_000, - }, - ).Get(activityOpts, &sendToFailedResult) - if err != nil { - e = errors.Join(e, err) - } + if e != nil { + e = errors.Join(e, w.sendToFailedBucket(sessCtx, tinfo.SendToFailed, tinfo.req.Key)) } w.sessionCleanup(sessCtx, &cleanup) @@ -431,8 +408,8 @@ func (w *ProcessingWorkflow) SessionHandler( // Delete download tmp dir when session ends. cleanup.registerPath(filepath.Dir(downloadResult.Path)) - tinfo.SendToFailedActivityName = activities.SendToFailedSIPsName - tinfo.SendToFailedPath = downloadResult.Path + tinfo.SendToFailed.Path = downloadResult.Path + tinfo.SendToFailed.ActivityName = activities.SendToFailedSIPsName } // Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow. @@ -853,9 +830,9 @@ func (w *ProcessingWorkflow) transferA3m( cleanup.registerPath(bundleResult.FullPath) } - tinfo.SendToFailedPath = tinfo.Bundle.FullPath - tinfo.SendToFailedActivityName = activities.SendToFailedPIPsName - tinfo.SendToFailedNeedsZipping = true + tinfo.SendToFailed.Path = tinfo.Bundle.FullPath + tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName + tinfo.SendToFailed.NeedsZipping = true // Send PIP to a3m for preservation. { @@ -918,8 +895,8 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo return err } - tinfo.SendToFailedPath = zipResult.Path - tinfo.SendToFailedActivityName = activities.SendToFailedPIPsName + tinfo.SendToFailed.Path = zipResult.Path + tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName // Upload PIP to AMSS. activityOpts = temporalsdk_workflow.WithActivityOptions(ctx, @@ -1170,3 +1147,44 @@ func (w *ProcessingWorkflow) completePreservationTask( return nil } + +func (w *ProcessingWorkflow) sendToFailedBucket( + sessCtx temporalsdk_workflow.Context, + stf SendToFailed, + key string, +) error { + if stf.Path == "" || stf.ActivityName == "" { + return nil + } + + if stf.NeedsZipping { + var zipResult archivezip.Result + activityOpts := withActivityOptsForLocalAction(sessCtx) + err := temporalsdk_workflow.ExecuteActivity( + activityOpts, + archivezip.Name, + &archivezip.Params{SourceDir: stf.Path}, + ).Get(activityOpts, &zipResult) + if err != nil { + return err + } + stf.Path = zipResult.Path + } + + var sendToFailedResult bucketupload.Result + activityOpts := withActivityOptsForLongLivedRequest(sessCtx) + err := temporalsdk_workflow.ExecuteActivity( + activityOpts, + stf.ActivityName, + &bucketupload.Params{ + Path: stf.Path, + Key: fmt.Sprintf("Failed_%s", key), + BufferSize: 100_000_000, + }, + ).Get(activityOpts, &sendToFailedResult) + if err != nil { + return err + } + + return nil +}