Skip to content

Commit

Permalink
Move send to failed to its own method and struct
Browse files Browse the repository at this point in the history
[skip-codecov]
  • Loading branch information
jraddaoui committed Oct 18, 2024
1 parent dd678d1 commit 668d564
Showing 1 changed file with 61 additions and 43 deletions.
104 changes: 61 additions & 43 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,17 @@ type TransferInfo struct {
GlobalTaskQueue string
PreservationTaskQueue string

// Send to failed variables used to keep track of the SIP
// location, if it requires zipping (a3m PIP) and what activity
// needs to be called to be uploaded to the expected bucket.
SendToFailedPath string
SendToFailedActivityName string
SendToFailedNeedsZipping bool
// Send to failed information.
SendToFailed SendToFailed
}

// Send to failed variables used to keep track of the SIP/PIP
// location, if it requires zipping (a3m PIP) and what activity
// needs to be called to be uploaded to the expected bucket.
type SendToFailed struct {
Path string
ActivityName string
NeedsZipping bool
}

func (t *TransferInfo) Name() string {
Expand Down Expand Up @@ -338,36 +343,8 @@ func (w *ProcessingWorkflow) SessionHandler(
) (e error) {
var cleanup cleanupRegistry
defer func() {
if e != nil && tinfo.SendToFailedActivityName != "" && tinfo.SendToFailedPath != "" {
if tinfo.SendToFailedNeedsZipping {
var zipResult archivezip.Result
activityOpts := withActivityOptsForLocalAction(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
archivezip.Name,
&archivezip.Params{SourceDir: tinfo.SendToFailedPath},
).Get(activityOpts, &zipResult)
if err != nil {
e = errors.Join(e, err)
return
}
tinfo.SendToFailedPath = zipResult.Path
}

var sendToFailedResult bucketupload.Result
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
tinfo.SendToFailedActivityName,
&bucketupload.Params{
Path: tinfo.SendToFailedPath,
Key: fmt.Sprintf("Failed_%s", tinfo.req.Key),
BufferSize: 100_000_000,
},
).Get(activityOpts, &sendToFailedResult)
if err != nil {
e = errors.Join(e, err)
}
if e != nil {
e = errors.Join(e, w.sendToFailedBucket(sessCtx, tinfo.SendToFailed, tinfo.req.Key))
}

w.sessionCleanup(sessCtx, &cleanup)
Expand Down Expand Up @@ -431,8 +408,8 @@ func (w *ProcessingWorkflow) SessionHandler(
// Delete download tmp dir when session ends.
cleanup.registerPath(filepath.Dir(downloadResult.Path))

tinfo.SendToFailedActivityName = activities.SendToFailedSIPsName
tinfo.SendToFailedPath = downloadResult.Path
tinfo.SendToFailed.Path = downloadResult.Path
tinfo.SendToFailed.ActivityName = activities.SendToFailedSIPsName
}

// Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow.
Expand Down Expand Up @@ -853,9 +830,9 @@ func (w *ProcessingWorkflow) transferA3m(
cleanup.registerPath(bundleResult.FullPath)
}

tinfo.SendToFailedPath = tinfo.Bundle.FullPath
tinfo.SendToFailedActivityName = activities.SendToFailedPIPsName
tinfo.SendToFailedNeedsZipping = true
tinfo.SendToFailed.Path = tinfo.Bundle.FullPath
tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName
tinfo.SendToFailed.NeedsZipping = true

// Send PIP to a3m for preservation.
{
Expand Down Expand Up @@ -918,8 +895,8 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo
return err
}

tinfo.SendToFailedPath = zipResult.Path
tinfo.SendToFailedActivityName = activities.SendToFailedPIPsName
tinfo.SendToFailed.Path = zipResult.Path
tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName

// Upload PIP to AMSS.
activityOpts = temporalsdk_workflow.WithActivityOptions(ctx,
Expand Down Expand Up @@ -1170,3 +1147,44 @@ func (w *ProcessingWorkflow) completePreservationTask(

return nil
}

func (w *ProcessingWorkflow) sendToFailedBucket(
sessCtx temporalsdk_workflow.Context,
stf SendToFailed,
key string,
) error {
if stf.Path == "" || stf.ActivityName == "" {
return nil
}

if stf.NeedsZipping {
var zipResult archivezip.Result
activityOpts := withActivityOptsForLocalAction(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
archivezip.Name,
&archivezip.Params{SourceDir: stf.Path},
).Get(activityOpts, &zipResult)
if err != nil {
return err
}
stf.Path = zipResult.Path
}

var sendToFailedResult bucketupload.Result
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
stf.ActivityName,
&bucketupload.Params{
Path: stf.Path,
Key: fmt.Sprintf("Failed_%s", key),
BufferSize: 100_000_000,
},
).Get(activityOpts, &sendToFailedResult)
if err != nil {
return err
}

return nil
}

0 comments on commit 668d564

Please sign in to comment.