From fd19cff784c405190a6a0a49b19b80b349285abc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Raddaoui=20Mar=C3=ADn?= Date: Thu, 25 Apr 2024 08:11:49 +0200 Subject: [PATCH] WIP: Add failed transfers/SIPs buckets --- cmd/enduro-a3m-worker/main.go | 38 +++ cmd/enduro-am-worker/main.go | 38 +++ enduro.toml | 16 ++ hack/kube/base/minio-setup-buckets-job.yaml | 2 + .../tools/minio-recreate-buckets-job.yaml | 2 + internal/config/config.go | 3 + .../activities/send_to_failed_bucket.go | 67 ++++++ internal/workflow/processing.go | 218 ++++++++++-------- 8 files changed, 283 insertions(+), 101 deletions(-) create mode 100644 internal/workflow/activities/send_to_failed_bucket.go diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index c78bb58de..f43f4a36a 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -26,6 +26,7 @@ import ( temporalsdk_interceptor "go.temporal.io/sdk/interceptor" temporalsdk_worker "go.temporal.io/sdk/worker" goahttp "goa.design/goa/v3/http" + "gocloud.dev/blob" "github.com/artefactual-sdps/enduro/internal/a3m" "github.com/artefactual-sdps/enduro/internal/api/auth" @@ -38,6 +39,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/persistence" entclient "github.com/artefactual-sdps/enduro/internal/persistence/ent/client" entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db" + "github.com/artefactual-sdps/enduro/internal/storage" "github.com/artefactual-sdps/enduro/internal/telemetry" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/version" @@ -181,6 +183,38 @@ func main() { } } + // Set-up failed transfers bucket. + var ft *blob.Bucket + { + fl, err := storage.NewInternalLocation(&cfg.FailedTransfers) + if err != nil { + logger.Error(err, "Error setting up failed transfers location.") + os.Exit(1) + } + ft, err = fl.OpenBucket(ctx) + if err != nil { + logger.Error(err, "Error getting failed transfers bucket.") + os.Exit(1) + } + } + defer ft.Close() + + // Set-up failed SIPs bucket. + var fs *blob.Bucket + { + fl, err := storage.NewInternalLocation(&cfg.FailedSIPs) + if err != nil { + logger.Error(err, "Error setting up failed SIPs location.") + os.Exit(1) + } + fs, err = fl.OpenBucket(ctx) + if err != nil { + logger.Error(err, "Error getting failed SIPs bucket.") + os.Exit(1) + } + } + defer fs.Close() + var g run.Group // Activity worker. @@ -273,6 +307,10 @@ func main() { activities.NewRejectPackageActivity(storageClient).Execute, temporalsdk_activity.RegisterOptions{Name: activities.RejectPackageActivityName}, ) + w.RegisterActivityWithOptions( + activities.NewSendToFailedBuckeActivity(ft, fs).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName}, + ) g.Add( func() error { diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index ad4a37292..501ebb94d 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -27,6 +27,7 @@ import ( temporalsdk_contrib_opentelemetry "go.temporal.io/sdk/contrib/opentelemetry" temporalsdk_interceptor "go.temporal.io/sdk/interceptor" temporalsdk_worker "go.temporal.io/sdk/worker" + "gocloud.dev/blob" "github.com/artefactual-sdps/enduro/internal/am" "github.com/artefactual-sdps/enduro/internal/api/auth" @@ -38,6 +39,7 @@ import ( entclient "github.com/artefactual-sdps/enduro/internal/persistence/ent/client" entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db" "github.com/artefactual-sdps/enduro/internal/sftp" + "github.com/artefactual-sdps/enduro/internal/storage" "github.com/artefactual-sdps/enduro/internal/telemetry" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/version" @@ -172,6 +174,38 @@ func main() { ) } + // Set-up failed transfers bucket. + var ft *blob.Bucket + { + fl, err := storage.NewInternalLocation(&cfg.FailedTransfers) + if err != nil { + logger.Error(err, "Error setting up failed transfers location.") + os.Exit(1) + } + ft, err = fl.OpenBucket(ctx) + if err != nil { + logger.Error(err, "Error getting failed transfers bucket.") + os.Exit(1) + } + } + defer ft.Close() + + // Set-up failed SIPs bucket. + var fs *blob.Bucket + { + fl, err := storage.NewInternalLocation(&cfg.FailedSIPs) + if err != nil { + logger.Error(err, "Error setting up failed SIPs location.") + os.Exit(1) + } + fs, err = fl.OpenBucket(ctx) + if err != nil { + logger.Error(err, "Error getting failed SIPs bucket.") + os.Exit(1) + } + } + defer fs.Close() + var g run.Group // Activity worker. @@ -259,6 +293,10 @@ func main() { activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName}, ) + w.RegisterActivityWithOptions( + activities.NewSendToFailedBuckeActivity(ft, fs).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName}, + ) g.Add( func() error { diff --git a/enduro.toml b/enduro.toml index 768a8cd64..20e4d4c27 100644 --- a/enduro.toml +++ b/enduro.toml @@ -152,3 +152,19 @@ sharedPath = "/home/enduro/preprocessing" namespace = "default" taskQueue = "preprocessing" workflowName = "preprocessing" + +[failedtransfers] +endpoint = "http://minio.enduro-sdps:9000" +pathStyle = true +key = "minio" +secret = "minio123" +region = "us-west-1" +bucket = "failed-transfers" + +[failedsips] +endpoint = "http://minio.enduro-sdps:9000" +pathStyle = true +key = "minio" +secret = "minio123" +region = "us-west-1" +bucket = "failed-sips" diff --git a/hack/kube/base/minio-setup-buckets-job.yaml b/hack/kube/base/minio-setup-buckets-job.yaml index ae54d3e3f..6917427bf 100644 --- a/hack/kube/base/minio-setup-buckets-job.yaml +++ b/hack/kube/base/minio-setup-buckets-job.yaml @@ -28,6 +28,8 @@ spec: "-c", "mc alias set enduro http://minio.enduro-sdps:9000 ${MINIO_USER} ${MINIO_PASSWORD}; mc mb enduro/sips --ignore-existing; + mc mb enduro/failed-transfers --ignore-existing; + mc mb enduro/failed-sips --ignore-existing; mc mb enduro/aips --ignore-existing; mc mb enduro/perma-aips-1 --ignore-existing; mc mb enduro/perma-aips-2 --ignore-existing; diff --git a/hack/kube/tools/minio-recreate-buckets-job.yaml b/hack/kube/tools/minio-recreate-buckets-job.yaml index 39db9e04d..df35d6ca0 100644 --- a/hack/kube/tools/minio-recreate-buckets-job.yaml +++ b/hack/kube/tools/minio-recreate-buckets-job.yaml @@ -28,6 +28,8 @@ spec: "mc alias set enduro http://minio.enduro-sdps:9000 ${MINIO_USER} ${MINIO_PASSWORD}; mc rb --force --dangerous enduro; mc mb enduro/sips --ignore-existing; + mc mb enduro/failed-transfers --ignore-existing; + mc mb enduro/failed-sips --ignore-existing; mc mb enduro/aips --ignore-existing; mc mb enduro/perma-aips-1 --ignore-existing; mc mb enduro/perma-aips-2 --ignore-existing; diff --git a/internal/config/config.go b/internal/config/config.go index 3037b32b2..feff9f91b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -44,6 +44,9 @@ type Configuration struct { Upload upload.Config Watcher watcher.Config Telemetry telemetry.Config + + FailedTransfers storage.LocationConfig + FailedSIPs storage.LocationConfig } func (c Configuration) Validate() error { diff --git a/internal/workflow/activities/send_to_failed_bucket.go b/internal/workflow/activities/send_to_failed_bucket.go new file mode 100644 index 000000000..e6f1377f2 --- /dev/null +++ b/internal/workflow/activities/send_to_failed_bucket.go @@ -0,0 +1,67 @@ +package activities + +import ( + "context" + "os" + + "gocloud.dev/blob" +) + +const ( + SendToFailedBucketName = "send-to-failed-bucket" + FailureSIP = "sip" + FailureTransfer = "transfer" +) + +type SendToFailedBucketActivity struct { + failedTransferBucket *blob.Bucket + failedSipBucket *blob.Bucket +} + +func NewSendToFailedBuckeActivity(transfer, sip *blob.Bucket) *SendToFailedBucketActivity { + return &SendToFailedBucketActivity{ + failedTransferBucket: transfer, + failedSipBucket: sip, + } +} + +type SendToFailedBucketParams struct { + Type string + Path string + Key string +} + +type SendToFailedBucketResult struct { + FailedKey string +} + +func (sf *SendToFailedBucketActivity) Execute( + ctx context.Context, + params *SendToFailedBucketParams, +) (*SendToFailedBucketResult, error) { + res := &SendToFailedBucketResult{} + + f, err := os.Open(params.Path) + if err != nil { + return nil, err + } + defer f.Close() + + res.FailedKey = "Failed_" + params.Key + writerOptions := &blob.WriterOptions{ + ContentType: "application/octet-stream", + BufferSize: 100_000_000, + } + + switch params.Type { + case FailureTransfer: + err = sf.failedTransferBucket.Upload(ctx, res.FailedKey, f, writerOptions) + case FailureSIP: + err = sf.failedSipBucket.Upload(ctx, res.FailedKey, f, writerOptions) + } + if err != nil { + return nil, err + } + + return res, nil +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 7b4d5251b..37f582782 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -91,6 +91,9 @@ type TransferInfo struct { // It is populated by the workflow request. GlobalTaskQueue string PreservationTaskQueue string + + SendToFailedPath string + SendToFailedType string } func (t *TransferInfo) Name() string { @@ -130,8 +133,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.logger, w.pkgsvc, &createPackageLocalActivityParams{ Key: req.Key, Status: status, - }). - Get(activityOpts, &tinfo.req.PackageID) + }).Get(activityOpts, &tinfo.req.PackageID) } else { // TODO: investigate better way to reset the package_. err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.pkgsvc, &updatePackageLocalActivityParams{ @@ -165,8 +167,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack SIPID: tinfo.SIPID, StoredAt: tinfo.StoredAt, Status: status, - }). - Get(activityOpts, nil) + }).Get(activityOpts, nil) if paStatus != enums.PreservationActionStatusDone { paStatus = enums.PreservationActionStatusError @@ -176,8 +177,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack PreservationActionID: tinfo.PreservationActionID, Status: paStatus, CompletedAt: temporalsdk_workflow.Now(dctx).UTC(), - }). - Get(activityOpts, nil) + }).Get(activityOpts, nil) }() // Activities running within a session. @@ -268,18 +268,38 @@ func (w *ProcessingWorkflow) SessionHandler( sessCtx temporalsdk_workflow.Context, attempt int, tinfo *TransferInfo, -) error { - defer temporalsdk_workflow.CompleteSession(sessCtx) +) (e error) { + defer func() { + if e != nil && tinfo.SendToFailedType != "" && tinfo.SendToFailedPath != "" { + // TODO: make sure tinfo.SendToFailedPath is zipped. + activityOpts := withActivityOptsForLongLivedRequest(sessCtx) + sendErr := temporalsdk_workflow.ExecuteActivity( + activityOpts, + activities.SendToFailedBucketName, + &activities.SendToFailedBucketParams{ + Type: tinfo.SendToFailedType, + Path: tinfo.SendToFailedPath, + Key: tinfo.req.Key, + }, + ).Get(activityOpts, nil) + if sendErr != nil { + e = errors.Join(e, sendErr) + } + } + + temporalsdk_workflow.CompleteSession(sessCtx) + }() packageStartedAt := temporalsdk_workflow.Now(sessCtx).UTC() + tinfo.SendToFailedType = activities.FailureTransfer // Set in-progress status. { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, setStatusInProgressLocalActivity, w.pkgsvc, tinfo.req.PackageID, packageStartedAt). + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, setStatusInProgressLocalActivity, w.pkgsvc, tinfo.req.PackageID, packageStartedAt). Get(ctx, nil) - if err != nil { - return err + if e != nil { + return e } } @@ -294,16 +314,15 @@ func (w *ProcessingWorkflow) SessionHandler( } ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, createPreservationActionLocalActivity, w.pkgsvc, &createPreservationActionLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, createPreservationActionLocalActivity, w.pkgsvc, &createPreservationActionLocalActivityParams{ WorkflowID: temporalsdk_workflow.GetInfo(ctx).WorkflowExecution.ID, Type: preservationActionType, Status: enums.PreservationActionStatusInProgress, StartedAt: packageStartedAt, PackageID: tinfo.req.PackageID, - }). - Get(ctx, &tinfo.PreservationActionID) - if err != nil { - return err + }).Get(ctx, &tinfo.PreservationActionID) + if e != nil { + return e } } } @@ -319,19 +338,20 @@ func (w *ProcessingWorkflow) SessionHandler( if w.cfg.Preprocessing.Enabled { params.DestinationPath = w.cfg.Preprocessing.SharedPath } - err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.DownloadActivityName, params). + e = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.DownloadActivityName, params). Get(activityOpts, &downloadResult) - if err != nil { - return err + if e != nil { + return e } tinfo.TempPath = downloadResult.Path + tinfo.SendToFailedPath = downloadResult.Path } // Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow. if !tinfo.req.IsDir && (!w.cfg.Preprocessing.Enabled || !w.cfg.Preprocessing.Extract) { activityOpts := withActivityOptsForLocalAction(sessCtx) var result activities.UnarchiveActivityResult - err := temporalsdk_workflow.ExecuteActivity( + e := temporalsdk_workflow.ExecuteActivity( activityOpts, activities.UnarchiveActivityName, &activities.UnarchiveActivityParams{ @@ -339,16 +359,16 @@ func (w *ProcessingWorkflow) SessionHandler( StripTopLevelDir: tinfo.req.StripTopLevelDir, }, ).Get(activityOpts, &result) - if err != nil { - return err + if e != nil { + return e } tinfo.TempPath = result.DestPath tinfo.req.IsDir = result.IsDir } // Preprocessing child workflow. - if err := w.preprocessing(sessCtx, tinfo); err != nil { - return err + if e := w.preprocessing(sessCtx, tinfo); e != nil { + return e } // Bundle. @@ -362,7 +382,7 @@ func (w *ProcessingWorkflow) SessionHandler( activityOpts := withActivityOptsForLongLivedRequest(sessCtx) var bundleResult activities.BundleActivityResult - err := temporalsdk_workflow.ExecuteActivity( + e = temporalsdk_workflow.ExecuteActivity( activityOpts, activities.BundleActivityName, &activities.BundleActivityParams{ @@ -371,11 +391,12 @@ func (w *ProcessingWorkflow) SessionHandler( IsDir: tinfo.req.IsDir, }, ).Get(activityOpts, &bundleResult) - if err != nil { - return err + if e != nil { + return e } tinfo.Bundle = bundleResult + tinfo.SendToFailedPath = tinfo.Bundle.FullPath } // Delete local temporary files. @@ -385,35 +406,37 @@ func (w *ProcessingWorkflow) SessionHandler( activityOpts := withActivityOptsForRequest(sessCtx) _ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ FullPath: tinfo.Bundle.FullPath, - }). - Get(activityOpts, nil) + }).Get(activityOpts, nil) } }() + tinfo.SendToFailedType = activities.FailureSIP + // Do preservation activities. { - var err error if w.cfg.Preservation.TaskQueue == temporal.AmWorkerTaskQueue { - err = w.transferAM(sessCtx, tinfo) + e = w.transferAM(sessCtx, tinfo) } else { - err = w.transferA3m(sessCtx, tinfo) + e = w.transferA3m(sessCtx, tinfo) } - if err != nil { - return err + if e != nil { + return e } } // Persist SIPID. { activityOpts := withLocalActivityOpts(sessCtx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.pkgsvc, &updatePackageLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.pkgsvc, &updatePackageLocalActivityParams{ PackageID: tinfo.req.PackageID, Key: tinfo.req.Key, SIPID: tinfo.SIPID, StoredAt: tinfo.StoredAt, Status: enums.PackageStatusInProgress, - }). - Get(activityOpts, nil) + }).Get(activityOpts, nil) + if e != nil { + return e + } } // Stop here for the Archivematica workflow. @@ -427,17 +450,16 @@ func (w *ProcessingWorkflow) SessionHandler( // Add preservation task for upload to review bucket. if !tinfo.req.AutoApproveAIP { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, createPreservationTaskLocalActivity, w.pkgsvc, &createPreservationTaskLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, createPreservationTaskLocalActivity, w.pkgsvc, &createPreservationTaskLocalActivityParams{ TaskID: uuid.NewString(), Name: "Move AIP", Status: enums.PreservationTaskStatusInProgress, StartedAt: temporalsdk_workflow.Now(sessCtx).UTC(), Note: "Moving to review bucket", PreservationActionID: tinfo.PreservationActionID, - }). - Get(ctx, &uploadPreservationTaskID) - if err != nil { - return err + }).Get(ctx, &uploadPreservationTaskID) + if e != nil { + return e } } @@ -451,29 +473,27 @@ func (w *ProcessingWorkflow) SessionHandler( MaximumAttempts: 3, }, }) - err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.UploadActivityName, &activities.UploadActivityParams{ + e = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.UploadActivityName, &activities.UploadActivityParams{ AIPPath: tinfo.AIPPath, AIPID: tinfo.SIPID, Name: tinfo.req.Key, - }). - Get(activityOpts, nil) - if err != nil { - return err + }).Get(activityOpts, nil) + if e != nil { + return e } } // Complete preservation task for upload to review bucket. if !tinfo.req.AutoApproveAIP { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, completePreservationTaskLocalActivity, w.pkgsvc, &completePreservationTaskLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, completePreservationTaskLocalActivity, w.pkgsvc, &completePreservationTaskLocalActivityParams{ ID: uploadPreservationTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: temporalsdk_workflow.Now(sessCtx).UTC(), Note: ref.New("Moved to review bucket"), - }). - Get(ctx, nil) - if err != nil { - return err + }).Get(ctx, nil) + if e != nil { + return e } } @@ -491,25 +511,25 @@ func (w *ProcessingWorkflow) SessionHandler( // Set package to pending status. { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, setStatusLocalActivity, w.pkgsvc, tinfo.req.PackageID, enums.PackageStatusPending).Get(ctx, nil) - if err != nil { - return err + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, setStatusLocalActivity, w.pkgsvc, tinfo.req.PackageID, enums.PackageStatusPending).Get(ctx, nil) + if e != nil { + return e } } // Set preservation action to pending status. { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, setPreservationActionStatusLocalActivity, w.pkgsvc, tinfo.PreservationActionID, enums.PreservationActionStatusPending).Get(ctx, nil) - if err != nil { - return err + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, setPreservationActionStatusLocalActivity, w.pkgsvc, tinfo.PreservationActionID, enums.PreservationActionStatusPending).Get(ctx, nil) + if e != nil { + return e } } // Add preservation task for package review { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, createPreservationTaskLocalActivity, w.pkgsvc, &createPreservationTaskLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, createPreservationTaskLocalActivity, w.pkgsvc, &createPreservationTaskLocalActivityParams{ TaskID: uuid.NewString(), Name: "Review AIP", Status: enums.PreservationTaskStatusPending, @@ -517,8 +537,8 @@ func (w *ProcessingWorkflow) SessionHandler( Note: "Awaiting user decision", PreservationActionID: tinfo.PreservationActionID, }).Get(ctx, &reviewPreservationTaskID) - if err != nil { - return err + if e != nil { + return e } } @@ -527,18 +547,18 @@ func (w *ProcessingWorkflow) SessionHandler( // Set package to in progress status. { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, setStatusLocalActivity, w.pkgsvc, tinfo.req.PackageID, enums.PackageStatusInProgress).Get(ctx, nil) - if err != nil { - return err + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, setStatusLocalActivity, w.pkgsvc, tinfo.req.PackageID, enums.PackageStatusInProgress).Get(ctx, nil) + if e != nil { + return e } } // Set preservation action to in progress status. { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, setPreservationActionStatusLocalActivity, w.pkgsvc, tinfo.PreservationActionID, enums.PreservationActionStatusInProgress).Get(ctx, nil) - if err != nil { - return err + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, setPreservationActionStatusLocalActivity, w.pkgsvc, tinfo.PreservationActionID, enums.PreservationActionStatusInProgress).Get(ctx, nil) + if e != nil { + return e } } } @@ -549,15 +569,14 @@ func (w *ProcessingWorkflow) SessionHandler( // Record package confirmation in review preservation task if !tinfo.req.AutoApproveAIP { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, completePreservationTaskLocalActivity, w.pkgsvc, &completePreservationTaskLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, completePreservationTaskLocalActivity, w.pkgsvc, &completePreservationTaskLocalActivityParams{ ID: reviewPreservationTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: reviewCompletedAt, Note: ref.New("Reviewed and accepted"), - }). - Get(ctx, nil) - if err != nil { - return err + }).Get(ctx, nil) + if e != nil { + return e } } @@ -567,92 +586,88 @@ func (w *ProcessingWorkflow) SessionHandler( // Add preservation task for permanent storage move. { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, createPreservationTaskLocalActivity, w.pkgsvc, &createPreservationTaskLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, createPreservationTaskLocalActivity, w.pkgsvc, &createPreservationTaskLocalActivityParams{ TaskID: uuid.NewString(), Name: "Move AIP", Status: enums.PreservationTaskStatusInProgress, StartedAt: temporalsdk_workflow.Now(sessCtx).UTC(), Note: "Moving to permanent storage", PreservationActionID: tinfo.PreservationActionID, - }). - Get(ctx, &movePreservationTaskID) - if err != nil { - return err + }).Get(ctx, &movePreservationTaskID) + if e != nil { + return e } } // Move package to permanent storage { activityOpts := withActivityOptsForRequest(sessCtx) - err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.MoveToPermanentStorageActivityName, &activities.MoveToPermanentStorageActivityParams{ + e = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.MoveToPermanentStorageActivityName, &activities.MoveToPermanentStorageActivityParams{ AIPID: tinfo.SIPID, LocationID: *reviewResult.LocationID, - }). - Get(activityOpts, nil) - if err != nil { - return err + }).Get(activityOpts, nil) + if e != nil { + return e } } // Poll package move to permanent storage { activityOpts := withActivityOptsForLongLivedRequest(sessCtx) - err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.PollMoveToPermanentStorageActivityName, &activities.PollMoveToPermanentStorageActivityParams{ + e = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.PollMoveToPermanentStorageActivityName, &activities.PollMoveToPermanentStorageActivityParams{ AIPID: tinfo.SIPID, - }). - Get(activityOpts, nil) - if err != nil { - return err + }).Get(activityOpts, nil) + if e != nil { + return e } } // Complete preservation task for permanent storage move. { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, completePreservationTaskLocalActivity, w.pkgsvc, &completePreservationTaskLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, completePreservationTaskLocalActivity, w.pkgsvc, &completePreservationTaskLocalActivityParams{ ID: movePreservationTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: temporalsdk_workflow.Now(sessCtx).UTC(), Note: ref.New(fmt.Sprintf("Moved to location %s", *reviewResult.LocationID)), - }). - Get(ctx, nil) - if err != nil { - return err + }).Get(ctx, nil) + if e != nil { + return e } } // Set package location { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, setLocationIDLocalActivity, w.pkgsvc, tinfo.req.PackageID, *reviewResult.LocationID). + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, setLocationIDLocalActivity, w.pkgsvc, tinfo.req.PackageID, *reviewResult.LocationID). Get(ctx, nil) - if err != nil { - return err + if e != nil { + return e } } } else if !tinfo.req.AutoApproveAIP { // Record package rejection in review preservation task { ctx := withLocalActivityOpts(sessCtx) - err := temporalsdk_workflow.ExecuteLocalActivity(ctx, completePreservationTaskLocalActivity, w.pkgsvc, &completePreservationTaskLocalActivityParams{ + e = temporalsdk_workflow.ExecuteLocalActivity(ctx, completePreservationTaskLocalActivity, w.pkgsvc, &completePreservationTaskLocalActivityParams{ ID: reviewPreservationTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: reviewCompletedAt, Note: ref.New("Reviewed and rejected"), }).Get(ctx, nil) - if err != nil { - return err + if e != nil { + return e } } // Reject package { activityOpts := withActivityOptsForRequest(sessCtx) - err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.RejectPackageActivityName, &activities.RejectPackageActivityParams{ + e = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.RejectPackageActivityName, &activities.RejectPackageActivityParams{ AIPID: tinfo.SIPID, }).Get(activityOpts, nil) - if err != nil { - return err + if e != nil { + return e } } } @@ -710,6 +725,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti if err != nil { return err } + tinfo.SendToFailedPath = zipResult.Path // Upload transfer to AMSS. activityOpts = temporalsdk_workflow.WithActivityOptions(sessCtx,