From f995a8fee57ca6adcc30caf7b85d93fd8cc88810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Raddaoui=20Mar=C3=ADn?= Date: Mon, 21 Oct 2024 17:10:31 +0200 Subject: [PATCH] Add failed SIPs/PIPs buckets If there is an error before the SIP is sent to preservation, copy the downloaded SIP file to a failed SIPs bucket. If there is an error on preservation, send a copy of the PIP to a failed PIPs bucket, with all the transformations made before it was sent to preservation. - Add failed buckets configuration. - Use temporal-activities/bucketupload registered with two different names for each failed bucket. - Modify Kubernetes jobs to set up and re-create buckets. [skip-codecov] --- cmd/enduro-a3m-worker/main.go | 31 ++ cmd/enduro-am-worker/main.go | 26 ++ enduro.toml | 16 + hack/kube/base/minio-setup-buckets-job.yaml | 2 + .../tools/minio-recreate-buckets-job.yaml | 2 + internal/config/config.go | 4 + internal/workflow/activities/activities.go | 2 + internal/workflow/processing.go | 74 +++- internal/workflow/processing_test.go | 416 +++++++++++++++++- 9 files changed, 567 insertions(+), 6 deletions(-) diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index 927c5a1f9..3bd0134af 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -15,12 +15,15 @@ import ( "entgo.io/ent/dialect/sql" bagit_gython "github.com/artefactual-labs/bagit-gython" "github.com/artefactual-sdps/temporal-activities/archiveextract" + "github.com/artefactual-sdps/temporal-activities/archivezip" "github.com/artefactual-sdps/temporal-activities/bagvalidate" + "github.com/artefactual-sdps/temporal-activities/bucketupload" "github.com/artefactual-sdps/temporal-activities/removepaths" "github.com/hashicorp/go-cleanhttp" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/pflag" + "go.artefactual.dev/tools/bucket" "go.artefactual.dev/tools/log" temporal_tools "go.artefactual.dev/tools/temporal" "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" @@ -203,6 +206,22 @@ func main() { } }() + // Set-up failed SIPs bucket. + failedSIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedSIPs) + if err != nil { + logger.Error(err, "Error setting up failed SIPs bucket.") + os.Exit(1) + } + defer failedSIPs.Close() + + // Set-up failed PIPs bucket. + failedPIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedPIPs) + if err != nil { + logger.Error(err, "Error setting up failed PIPs bucket.") + os.Exit(1) + } + defer failedPIPs.Close() + var g run.Group // Activity worker. @@ -306,6 +325,18 @@ func main() { activities.NewRejectPackageActivity(storageClient).Execute, temporalsdk_activity.RegisterOptions{Name: activities.RejectPackageActivityName}, ) + w.RegisterActivityWithOptions( + archivezip.New().Execute, + temporalsdk_activity.RegisterOptions{Name: archivezip.Name}, + ) + w.RegisterActivityWithOptions( + bucketupload.New(failedSIPs).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedSIPsName}, + ) + w.RegisterActivityWithOptions( + bucketupload.New(failedPIPs).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName}, + ) g.Add( func() error { diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index adc0a340c..ae325bdb9 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -18,6 +18,7 @@ import ( "github.com/artefactual-sdps/temporal-activities/archivezip" "github.com/artefactual-sdps/temporal-activities/bagcreate" "github.com/artefactual-sdps/temporal-activities/bagvalidate" + "github.com/artefactual-sdps/temporal-activities/bucketupload" "github.com/artefactual-sdps/temporal-activities/removepaths" "github.com/hashicorp/go-cleanhttp" "github.com/jonboulle/clockwork" @@ -25,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/pflag" "go.artefactual.dev/amclient" + "go.artefactual.dev/tools/bucket" "go.artefactual.dev/tools/log" temporal_tools "go.artefactual.dev/tools/temporal" "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" @@ -200,6 +202,22 @@ func main() { } }() + // Set-up failed SIPs bucket. + failedSIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedSIPs) + if err != nil { + logger.Error(err, "Error setting up failed SIPs bucket.") + os.Exit(1) + } + defer failedSIPs.Close() + + // Set-up failed PIPs bucket. + failedPIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedPIPs) + if err != nil { + logger.Error(err, "Error setting up failed PIPs bucket.") + os.Exit(1) + } + defer failedPIPs.Close() + var g run.Group // Activity worker. @@ -303,6 +321,14 @@ func main() { removepaths.New().Execute, temporalsdk_activity.RegisterOptions{Name: removepaths.Name}, ) + w.RegisterActivityWithOptions( + bucketupload.New(failedSIPs).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedSIPsName}, + ) + w.RegisterActivityWithOptions( + bucketupload.New(failedPIPs).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName}, + ) g.Add( func() error { diff --git a/enduro.toml b/enduro.toml index 48f539c5e..c3935feac 100644 --- a/enduro.toml +++ b/enduro.toml @@ -229,3 +229,19 @@ sharedPath = "/home/enduro/preprocessing" namespace = "default" taskQueue = "preprocessing" workflowName = "preprocessing" + +[failedSips] +endpoint = "http://minio.enduro-sdps:9000" +pathStyle = true +accessKey = "minio" +secretKey = "minio123" +region = "us-west-1" +bucket = "failed-sips" + +[failedPips] +endpoint = "http://minio.enduro-sdps:9000" +pathStyle = true +accessKey = "minio" +secretKey = "minio123" +region = "us-west-1" +bucket = "failed-pips" diff --git a/hack/kube/base/minio-setup-buckets-job.yaml b/hack/kube/base/minio-setup-buckets-job.yaml index ae54d3e3f..7d3f17e42 100644 --- a/hack/kube/base/minio-setup-buckets-job.yaml +++ b/hack/kube/base/minio-setup-buckets-job.yaml @@ -31,5 +31,7 @@ spec: mc mb enduro/aips --ignore-existing; mc mb enduro/perma-aips-1 --ignore-existing; mc mb enduro/perma-aips-2 --ignore-existing; + mc mb enduro/failed-sips --ignore-existing; + mc mb enduro/failed-pips --ignore-existing; mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing", ] diff --git a/hack/kube/tools/minio-recreate-buckets-job.yaml b/hack/kube/tools/minio-recreate-buckets-job.yaml index 39db9e04d..77ca12b1b 100644 --- a/hack/kube/tools/minio-recreate-buckets-job.yaml +++ b/hack/kube/tools/minio-recreate-buckets-job.yaml @@ -31,5 +31,7 @@ spec: mc mb enduro/aips --ignore-existing; mc mb enduro/perma-aips-1 --ignore-existing; mc mb enduro/perma-aips-2 --ignore-existing; + mc mb enduro/failed-sips --ignore-existing; + mc mb enduro/failed-pips --ignore-existing; mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing", ] diff --git a/internal/config/config.go b/internal/config/config.go index f83bbac7d..51df5cba0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,6 +14,7 @@ import ( "github.com/google/uuid" "github.com/mitchellh/mapstructure" "github.com/spf13/viper" + "go.artefactual.dev/tools/bucket" "github.com/artefactual-sdps/enduro/internal/a3m" "github.com/artefactual-sdps/enduro/internal/am" @@ -53,6 +54,9 @@ type Configuration struct { Upload package_.UploadConfig Watcher watcher.Config Telemetry telemetry.Config + + FailedSIPs bucket.Config + FailedPIPs bucket.Config } func (c *Configuration) Validate() error { diff --git a/internal/workflow/activities/activities.go b/internal/workflow/activities/activities.go index 471a7c35f..cff381c87 100644 --- a/internal/workflow/activities/activities.go +++ b/internal/workflow/activities/activities.go @@ -12,4 +12,6 @@ const ( PollMoveToPermanentStorageActivityName = "poll-move-to-permanent-storage-activity" RejectPackageActivityName = "reject-package-activity" UploadActivityName = "upload-activity" + SendToFailedSIPsName = "send-to-failed-sips" + SendToFailedPIPsName = "send-to-failed-pips" ) diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 85cde189c..c3d9dd742 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -18,6 +18,7 @@ import ( "github.com/artefactual-sdps/temporal-activities/archivezip" "github.com/artefactual-sdps/temporal-activities/bagcreate" "github.com/artefactual-sdps/temporal-activities/bagvalidate" + "github.com/artefactual-sdps/temporal-activities/bucketupload" "github.com/artefactual-sdps/temporal-activities/removepaths" "github.com/google/uuid" "go.artefactual.dev/tools/ref" @@ -110,6 +111,18 @@ type TransferInfo struct { // It is populated by the workflow request. GlobalTaskQueue string PreservationTaskQueue string + + // Send to failed information. + SendToFailed SendToFailed +} + +// Send to failed variables used to keep track of the SIP/PIP +// location, if it requires zipping (a3m PIP) and what activity +// needs to be called to be uploaded to the expected bucket. +type SendToFailed struct { + Path string + ActivityName string + NeedsZipping bool } func (t *TransferInfo) Name() string { @@ -327,9 +340,15 @@ func (w *ProcessingWorkflow) SessionHandler( sessCtx temporalsdk_workflow.Context, attempt int, tinfo *TransferInfo, -) error { +) (e error) { var cleanup cleanupRegistry - defer w.sessionCleanup(sessCtx, &cleanup) + defer func() { + if e != nil { + e = errors.Join(e, w.sendToFailedBucket(sessCtx, tinfo.SendToFailed, tinfo.req.Key)) + } + + w.sessionCleanup(sessCtx, &cleanup) + }() packageStartedAt := temporalsdk_workflow.Now(sessCtx).UTC() @@ -388,6 +407,9 @@ func (w *ProcessingWorkflow) SessionHandler( // Delete download tmp dir when session ends. cleanup.registerPath(filepath.Dir(downloadResult.Path)) + + tinfo.SendToFailed.Path = downloadResult.Path + tinfo.SendToFailed.ActivityName = activities.SendToFailedSIPsName } // Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow. @@ -808,6 +830,10 @@ func (w *ProcessingWorkflow) transferA3m( cleanup.registerPath(bundleResult.FullPath) } + tinfo.SendToFailed.Path = tinfo.Bundle.FullPath + tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName + tinfo.SendToFailed.NeedsZipping = true + // Send PIP to a3m for preservation. { activityOpts := temporalsdk_workflow.WithActivityOptions(sessCtx, temporalsdk_workflow.ActivityOptions{ @@ -869,6 +895,9 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo return err } + tinfo.SendToFailed.Path = zipResult.Path + tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName + // Upload PIP to AMSS. activityOpts = temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ @@ -1118,3 +1147,44 @@ func (w *ProcessingWorkflow) completePreservationTask( return nil } + +func (w *ProcessingWorkflow) sendToFailedBucket( + sessCtx temporalsdk_workflow.Context, + stf SendToFailed, + key string, +) error { + if stf.Path == "" || stf.ActivityName == "" { + return nil + } + + if stf.NeedsZipping { + var zipResult archivezip.Result + activityOpts := withActivityOptsForLocalAction(sessCtx) + err := temporalsdk_workflow.ExecuteActivity( + activityOpts, + archivezip.Name, + &archivezip.Params{SourceDir: stf.Path}, + ).Get(activityOpts, &zipResult) + if err != nil { + return err + } + stf.Path = zipResult.Path + } + + var sendToFailedResult bucketupload.Result + activityOpts := withActivityOptsForLongLivedRequest(sessCtx) + err := temporalsdk_workflow.ExecuteActivity( + activityOpts, + stf.ActivityName, + &bucketupload.Params{ + Path: stf.Path, + Key: fmt.Sprintf("Failed_%s", key), + BufferSize: 100_000_000, + }, + ).Get(activityOpts, &sendToFailedResult) + if err != nil { + return err + } + + return nil +} diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index fe192199c..9c61fac33 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -2,6 +2,7 @@ package workflow import ( "database/sql" + "fmt" "math/rand" "strings" "testing" @@ -11,6 +12,7 @@ import ( "github.com/artefactual-sdps/temporal-activities/archivezip" "github.com/artefactual-sdps/temporal-activities/bagcreate" "github.com/artefactual-sdps/temporal-activities/bagvalidate" + "github.com/artefactual-sdps/temporal-activities/bucketupload" "github.com/artefactual-sdps/temporal-activities/removepaths" "github.com/google/uuid" "github.com/jonboulle/clockwork" @@ -24,6 +26,7 @@ import ( temporalsdk_worker "go.temporal.io/sdk/worker" temporalsdk_workflow "go.temporal.io/sdk/workflow" "go.uber.org/mock/gomock" + "gocloud.dev/blob/memblob" "gotest.tools/v3/assert" "github.com/artefactual-sdps/enduro/internal/a3m" @@ -140,6 +143,18 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName}, ) + s.env.RegisterActivityWithOptions( + archivezip.New().Execute, + temporalsdk_activity.RegisterOptions{Name: archivezip.Name}, + ) + s.env.RegisterActivityWithOptions( + bucketupload.New(memblob.OpenBucket(nil)).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedSIPsName}, + ) + s.env.RegisterActivityWithOptions( + bucketupload.New(memblob.OpenBucket(nil)).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName}, + ) s.workflow = NewProcessingWorkflow(cfg, rng, pkgsvc, wsvc) } @@ -156,10 +171,6 @@ func (s *ProcessingWorkflowTestSuite) setupAMWorkflowTest( bagcreate.New(bagcreate.Config{}).Execute, temporalsdk_activity.RegisterOptions{Name: bagcreate.Name}, ) - s.env.RegisterActivityWithOptions( - archivezip.New().Execute, - temporalsdk_activity.RegisterOptions{Name: archivezip.Name}, - ) s.env.RegisterActivityWithOptions( am.NewUploadTransferActivity(sftpc, 10*time.Second).Execute, temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName}, @@ -1061,3 +1072,400 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { s.True(s.env.IsWorkflowCompleted()) s.NoError(s.env.GetWorkflowResult(nil)) } + +func (s *ProcessingWorkflowTestSuite) TestFailedSIP() { + cfg := config.Configuration{ + A3m: a3m.Config{ShareDir: s.CreateTransferDir()}, + Preservation: pres.Config{TaskQueue: temporal.A3mWorkerTaskQueue}, + Preprocessing: preprocessing.Config{ + Enabled: true, + Extract: true, + SharedPath: "/home/enduro/preprocessing/", + Temporal: preprocessing.Temporal{ + Namespace: "default", + TaskQueue: "preprocessing", + WorkflowName: "preprocessing", + }, + }, + Storage: storage.Config{DefaultPermanentLocationID: locationID}, + } + s.SetupWorkflowTest(cfg) + + pkgID := 1 + watcherName := "watcher" + key := "transfer.zip" + retentionPeriod := 1 * time.Second + ctx := mock.AnythingOfType("*context.valueCtx") + sessionCtx := mock.AnythingOfType("*context.timerCtx") + downloadDir := strings.Replace(tempPath, "/tmp/", cfg.Preprocessing.SharedPath, 1) + prepDest := strings.Replace(extractPath, "/tmp/", cfg.Preprocessing.SharedPath, 1) + pkgsvc := s.workflow.pkgsvc + + s.env.OnActivity( + createPackageLocalActivity, + ctx, + pkgsvc, + &createPackageLocalActivityParams{Key: key, Status: enums.PackageStatusQueued}, + ).Return(pkgID, nil) + + s.env.OnActivity( + setStatusInProgressLocalActivity, + ctx, + pkgsvc, + pkgID, + startTime, + ).Return(nil, nil) + + s.env.OnActivity( + createPreservationActionLocalActivity, + ctx, + pkgsvc, + &createPreservationActionLocalActivityParams{ + WorkflowID: "default-test-workflow-id", + Type: enums.PreservationActionTypeCreateAip, + Status: enums.PreservationActionStatusInProgress, + StartedAt: startTime, + PackageID: 1, + }, + ).Return(1, nil) + + s.env.OnActivity( + activities.DownloadActivityName, + sessionCtx, + &activities.DownloadActivityParams{ + Key: key, + WatcherName: watcherName, + DestinationPath: cfg.Preprocessing.SharedPath, + }, + ).Return(&activities.DownloadActivityResult{Path: downloadDir + "/" + key}, nil) + + s.env.OnWorkflow( + preprocessingChildWorkflow, + mock.AnythingOfType("*internal.valueCtx"), + &preprocessing.WorkflowParams{ + RelativePath: strings.TrimPrefix(downloadDir+"/"+key, cfg.Preprocessing.SharedPath), + }, + ).Return( + &preprocessing.WorkflowResult{ + Outcome: preprocessing.OutcomeContentError, + RelativePath: strings.TrimPrefix(prepDest, cfg.Preprocessing.SharedPath), + }, + nil, + ) + + s.env.OnActivity( + activities.SendToFailedSIPsName, + sessionCtx, + &bucketupload.Params{ + Path: downloadDir + "/" + key, + Key: fmt.Sprintf("Failed_%s", key), + BufferSize: 100_000_000, + }, + ).Return(&bucketupload.Result{}, nil) + + s.env.OnActivity( + updatePackageLocalActivity, + ctx, + pkgsvc, + mock.AnythingOfType("*workflow.updatePackageLocalActivityParams"), + ).Return(nil, nil) + + s.env.OnActivity( + completePreservationActionLocalActivity, + ctx, + pkgsvc, + mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams"), + ).Return(nil, nil) + + s.env.OnActivity( + removepaths.Name, + sessionCtx, + &removepaths.Params{Paths: []string{downloadDir}}, + ).Return(&removepaths.Result{}, nil) + + s.env.ExecuteWorkflow( + s.workflow.Execute, + &package_.ProcessingWorkflowRequest{ + Key: key, + WatcherName: watcherName, + RetentionPeriod: &retentionPeriod, + AutoApproveAIP: true, + DefaultPermanentLocationID: &cfg.Storage.DefaultPermanentLocationID, + }, + ) + + s.True(s.env.IsWorkflowCompleted()) + s.Error(s.env.GetWorkflowResult(nil)) +} + +func (s *ProcessingWorkflowTestSuite) TestFailedPIPA3m() { + cfg := config.Configuration{ + A3m: a3m.Config{ShareDir: s.CreateTransferDir()}, + Preservation: pres.Config{TaskQueue: temporal.A3mWorkerTaskQueue}, + Storage: storage.Config{DefaultPermanentLocationID: locationID}, + } + s.SetupWorkflowTest(cfg) + + pkgID := 1 + watcherName := "watcher" + key := "transfer.zip" + retentionPeriod := 1 * time.Second + ctx := mock.AnythingOfType("*context.valueCtx") + sessionCtx := mock.AnythingOfType("*context.timerCtx") + downloadDir := strings.Replace(tempPath, "/tmp/", cfg.Preprocessing.SharedPath, 1) + pkgsvc := s.workflow.pkgsvc + + s.env.OnActivity( + createPackageLocalActivity, + ctx, + pkgsvc, + &createPackageLocalActivityParams{Key: key, Status: enums.PackageStatusQueued}, + ).Return(pkgID, nil) + + s.env.OnActivity( + setStatusInProgressLocalActivity, + ctx, + pkgsvc, + pkgID, + startTime, + ).Return(nil, nil) + + s.env.OnActivity( + createPreservationActionLocalActivity, + ctx, + pkgsvc, + &createPreservationActionLocalActivityParams{ + WorkflowID: "default-test-workflow-id", + Type: enums.PreservationActionTypeCreateAip, + Status: enums.PreservationActionStatusInProgress, + StartedAt: startTime, + PackageID: 1, + }, + ).Return(1, nil) + + s.env.OnActivity( + activities.DownloadActivityName, + sessionCtx, + &activities.DownloadActivityParams{ + Key: key, + WatcherName: watcherName, + DestinationPath: cfg.Preprocessing.SharedPath, + }, + ).Return(&activities.DownloadActivityResult{Path: downloadDir + "/" + key}, nil) + + s.env.OnActivity( + archiveextract.Name, + sessionCtx, + &archiveextract.Params{SourcePath: downloadDir + "/" + key}, + ).Return(&archiveextract.Result{ExtractPath: extractPath}, nil) + + s.env.OnActivity( + activities.ClassifyPackageActivityName, + sessionCtx, + activities.ClassifyPackageActivityParams{Path: extractPath}, + ).Return(&activities.ClassifyPackageActivityResult{Type: enums.PackageTypeBagIt}, nil) + + s.env.OnActivity( + createPreservationTaskLocalActivity, + ctx, + &createPreservationTaskLocalActivityParams{ + PkgSvc: pkgsvc, + RNG: rand.New(rand.NewSource(1)), // #nosec: G404 + PreservationTask: datatypes.PreservationTask{ + Name: "Validate Bag", + Status: enums.PreservationTaskStatusInProgress, + StartedAt: sql.NullTime{ + Time: startTime, + Valid: true, + }, + PreservationActionID: 1, + }, + }, + ).Return(101, nil) + + s.env.OnActivity( + bagvalidate.Name, + sessionCtx, + &bagvalidate.Params{Path: extractPath}, + ).Return(&bagvalidate.Result{Valid: true}, nil) + + s.env.OnActivity( + completePreservationTaskLocalActivity, + ctx, + pkgsvc, + &completePreservationTaskLocalActivityParams{ + ID: 101, + Status: enums.PreservationTaskStatusDone, + CompletedAt: startTime, + Note: ref.New("Bag is valid"), + }, + ).Return(&completePreservationTaskLocalActivityResult{}, nil) + + s.env.OnActivity( + activities.BundleActivityName, + sessionCtx, + &activities.BundleActivityParams{ + SourcePath: extractPath, + TransferDir: s.transferDir, + IsDir: true, + }, + ).Return(&activities.BundleActivityResult{FullPath: transferPath}, nil) + + s.env.OnActivity(a3m.CreateAIPActivityName, sessionCtx, mock.AnythingOfType("*a3m.CreateAIPActivityParams")). + Return(nil, fmt.Errorf("a3m error")) + + s.env.OnActivity(archivezip.Name, sessionCtx, &archivezip.Params{SourceDir: transferPath}). + Return(&archivezip.Result{Path: transferPath + ".zip"}, nil) + + s.env.OnActivity( + activities.SendToFailedPIPsName, + sessionCtx, + &bucketupload.Params{ + Path: transferPath + ".zip", + Key: fmt.Sprintf("Failed_%s", key), + BufferSize: 100_000_000, + }, + ).Return(&bucketupload.Result{}, nil) + + s.env.OnActivity( + updatePackageLocalActivity, + ctx, + pkgsvc, + mock.AnythingOfType("*workflow.updatePackageLocalActivityParams"), + ).Return(nil, nil) + + s.env.OnActivity( + completePreservationActionLocalActivity, + ctx, + pkgsvc, + mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams"), + ).Return(nil, nil) + + s.env.OnActivity( + removepaths.Name, + sessionCtx, + &removepaths.Params{Paths: []string{downloadDir, transferPath}}, + ).Return(&removepaths.Result{}, nil) + + s.env.ExecuteWorkflow( + s.workflow.Execute, + &package_.ProcessingWorkflowRequest{ + Key: key, + WatcherName: watcherName, + RetentionPeriod: &retentionPeriod, + AutoApproveAIP: true, + DefaultPermanentLocationID: &cfg.Storage.DefaultPermanentLocationID, + }, + ) + + s.True(s.env.IsWorkflowCompleted()) + s.Error(s.env.GetWorkflowResult(nil)) +} + +func (s *ProcessingWorkflowTestSuite) TestFailedPIPAM() { + cfg := config.Configuration{ + Preservation: pres.Config{TaskQueue: temporal.AmWorkerTaskQueue}, + Storage: storage.Config{DefaultPermanentLocationID: amssLocationID}, + } + s.SetupWorkflowTest(cfg) + + pkgID := 1 + watcherName := "watcher" + key := "transfer.zip" + retentionPeriod := 1 * time.Second + ctx := mock.AnythingOfType("*context.valueCtx") + sessionCtx := mock.AnythingOfType("*context.timerCtx") + pkgsvc := s.workflow.pkgsvc + + s.env.OnActivity( + createPackageLocalActivity, + ctx, + pkgsvc, + &createPackageLocalActivityParams{Key: key, Status: enums.PackageStatusQueued}, + ).Return(pkgID, nil) + + s.env.OnActivity(setStatusInProgressLocalActivity, ctx, pkgsvc, pkgID, mock.AnythingOfType("time.Time")). + Return(nil, nil) + + s.env.OnActivity( + createPreservationActionLocalActivity, + ctx, + pkgsvc, + mock.AnythingOfType("*workflow.createPreservationActionLocalActivityParams"), + ).Return(0, nil) + + s.env.OnActivity( + activities.DownloadActivityName, + sessionCtx, + &activities.DownloadActivityParams{Key: key, WatcherName: watcherName}, + ).Return(&activities.DownloadActivityResult{Path: tempPath + "/" + key}, nil) + + s.env.OnActivity( + archiveextract.Name, + sessionCtx, + &archiveextract.Params{SourcePath: tempPath + "/" + key}, + ).Return(&archiveextract.Result{ExtractPath: extractPath}, nil) + + s.env.OnActivity( + activities.ClassifyPackageActivityName, + sessionCtx, + activities.ClassifyPackageActivityParams{Path: extractPath}, + ).Return(&activities.ClassifyPackageActivityResult{Type: enums.PackageTypeUnknown}, nil) + + s.env.OnActivity(bagcreate.Name, sessionCtx, &bagcreate.Params{SourcePath: extractPath}). + Return(&bagcreate.Result{BagPath: extractPath}, nil) + + s.env.OnActivity(archivezip.Name, sessionCtx, &archivezip.Params{SourceDir: extractPath}). + Return(&archivezip.Result{Path: extractPath + "/transfer.zip"}, nil) + + s.env.OnActivity( + am.UploadTransferActivityName, + sessionCtx, + &am.UploadTransferActivityParams{SourcePath: extractPath + "/transfer.zip"}, + ).Return(nil, fmt.Errorf("AM error")) + + s.env.OnActivity( + activities.SendToFailedPIPsName, + sessionCtx, + &bucketupload.Params{ + Path: extractPath + "/transfer.zip", + Key: fmt.Sprintf("Failed_%s", key), + BufferSize: 100_000_000, + }, + ).Return(&bucketupload.Result{}, nil) + + s.env.OnActivity( + updatePackageLocalActivity, + ctx, + pkgsvc, + mock.AnythingOfType("*workflow.updatePackageLocalActivityParams"), + ).Return(nil, nil) + + s.env.OnActivity( + completePreservationActionLocalActivity, + ctx, + pkgsvc, + mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams"), + ).Return(nil, nil) + + s.env.OnActivity( + removepaths.Name, + sessionCtx, + &removepaths.Params{Paths: []string{tempPath}}, + ).Return(&removepaths.Result{}, nil) + + s.env.ExecuteWorkflow( + s.workflow.Execute, + &package_.ProcessingWorkflowRequest{ + WatcherName: watcherName, + RetentionPeriod: &retentionPeriod, + AutoApproveAIP: true, + DefaultPermanentLocationID: &cfg.Storage.DefaultPermanentLocationID, + Key: key, + TransferDeadline: time.Second, + }, + ) + + s.True(s.env.IsWorkflowCompleted()) + s.Error(s.env.GetWorkflowResult(nil)) +}