From f07d118297439a5d9a7d4b56135a067eab10de6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Raddaoui=20Mar=C3=ADn?= Date: Thu, 25 Apr 2024 08:20:54 +0200 Subject: [PATCH] WIP: Add failed SIPs/PIPs buckets --- cmd/enduro-a3m-worker/main.go | 38 ++++++++++ cmd/enduro-am-worker/main.go | 38 ++++++++++ enduro.toml | 16 +++++ hack/kube/base/minio-setup-buckets-job.yaml | 2 + .../tools/minio-recreate-buckets-job.yaml | 2 + internal/config/config.go | 3 + .../activities/send_to_failed_bucket.go | 70 +++++++++++++++++++ internal/workflow/processing.go | 32 ++++++++- 8 files changed, 199 insertions(+), 2 deletions(-) create mode 100644 internal/workflow/activities/send_to_failed_bucket.go diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index 23fba16ad..58cb086d1 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -28,6 +28,7 @@ import ( temporalsdk_interceptor "go.temporal.io/sdk/interceptor" temporalsdk_worker "go.temporal.io/sdk/worker" goahttp "goa.design/goa/v3/http" + "gocloud.dev/blob" "github.com/artefactual-sdps/enduro/internal/a3m" "github.com/artefactual-sdps/enduro/internal/api/auth" @@ -40,6 +41,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/persistence" entclient "github.com/artefactual-sdps/enduro/internal/persistence/ent/client" entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db" + "github.com/artefactual-sdps/enduro/internal/storage" "github.com/artefactual-sdps/enduro/internal/telemetry" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/version" @@ -183,6 +185,38 @@ func main() { } } + // Set-up failed SIPs bucket. + var fs *blob.Bucket + { + fl, err := storage.NewInternalLocation(&cfg.FailedSIPs) + if err != nil { + logger.Error(err, "Error setting up failed SIPs location.") + os.Exit(1) + } + fs, err = fl.OpenBucket(ctx) + if err != nil { + logger.Error(err, "Error getting failed SIPs bucket.") + os.Exit(1) + } + } + defer fs.Close() + + // Set-up failed PIPs bucket. + var fp *blob.Bucket + { + fl, err := storage.NewInternalLocation(&cfg.FailedPIPs) + if err != nil { + logger.Error(err, "Error setting up failed PIPs location.") + os.Exit(1) + } + fp, err = fl.OpenBucket(ctx) + if err != nil { + logger.Error(err, "Error getting failed PIPs bucket.") + os.Exit(1) + } + } + defer fp.Close() + var g run.Group // Activity worker. @@ -276,6 +310,10 @@ func main() { activities.NewRejectPackageActivity(storageClient).Execute, temporalsdk_activity.RegisterOptions{Name: activities.RejectPackageActivityName}, ) + w.RegisterActivityWithOptions( + activities.NewSendToFailedBuckeActivity(fs, fp).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName}, + ) g.Add( func() error { diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index d7240032a..341357071 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -31,6 +31,7 @@ import ( temporalsdk_interceptor "go.temporal.io/sdk/interceptor" temporalsdk_worker "go.temporal.io/sdk/worker" goahttp "goa.design/goa/v3/http" + "gocloud.dev/blob" "github.com/artefactual-sdps/enduro/internal/am" "github.com/artefactual-sdps/enduro/internal/api/auth" @@ -44,6 +45,7 @@ import ( entclient "github.com/artefactual-sdps/enduro/internal/persistence/ent/client" entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db" "github.com/artefactual-sdps/enduro/internal/sftp" + "github.com/artefactual-sdps/enduro/internal/storage" "github.com/artefactual-sdps/enduro/internal/telemetry" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/version" @@ -178,6 +180,38 @@ func main() { ) } + // Set-up failed SIPs bucket. + var fs *blob.Bucket + { + fl, err := storage.NewInternalLocation(&cfg.FailedSIPs) + if err != nil { + logger.Error(err, "Error setting up failed SIPs location.") + os.Exit(1) + } + fs, err = fl.OpenBucket(ctx) + if err != nil { + logger.Error(err, "Error getting failed SIPs bucket.") + os.Exit(1) + } + } + defer fs.Close() + + // Set-up failed PIPs bucket. + var fp *blob.Bucket + { + fl, err := storage.NewInternalLocation(&cfg.FailedPIPs) + if err != nil { + logger.Error(err, "Error setting up failed PIPs location.") + os.Exit(1) + } + fp, err = fl.OpenBucket(ctx) + if err != nil { + logger.Error(err, "Error getting failed PIPs bucket.") + os.Exit(1) + } + } + defer fp.Close() + var g run.Group // Activity worker. @@ -271,6 +305,10 @@ func main() { filesys.NewRemoveActivity().Execute, temporalsdk_activity.RegisterOptions{Name: filesys.RemoveActivityName}, ) + w.RegisterActivityWithOptions( + activities.NewSendToFailedBuckeActivity(fs, fp).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName}, + ) g.Add( func() error { diff --git a/enduro.toml b/enduro.toml index 254caa4e8..9f6435a3f 100644 --- a/enduro.toml +++ b/enduro.toml @@ -189,3 +189,19 @@ sharedPath = "/home/enduro/preprocessing" namespace = "default" taskQueue = "preprocessing" workflowName = "preprocessing" + +[failedSips] +endpoint = "http://minio.enduro-sdps:9000" +pathStyle = true +key = "minio" +secret = "minio123" +region = "us-west-1" +bucket = "failed-sips" + +[failedPips] +endpoint = "http://minio.enduro-sdps:9000" +pathStyle = true +key = "minio" +secret = "minio123" +region = "us-west-1" +bucket = "failed-pips" diff --git a/hack/kube/base/minio-setup-buckets-job.yaml b/hack/kube/base/minio-setup-buckets-job.yaml index ae54d3e3f..7d3f17e42 100644 --- a/hack/kube/base/minio-setup-buckets-job.yaml +++ b/hack/kube/base/minio-setup-buckets-job.yaml @@ -31,5 +31,7 @@ spec: mc mb enduro/aips --ignore-existing; mc mb enduro/perma-aips-1 --ignore-existing; mc mb enduro/perma-aips-2 --ignore-existing; + mc mb enduro/failed-sips --ignore-existing; + mc mb enduro/failed-pips --ignore-existing; mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing", ] diff --git a/hack/kube/tools/minio-recreate-buckets-job.yaml b/hack/kube/tools/minio-recreate-buckets-job.yaml index 39db9e04d..77ca12b1b 100644 --- a/hack/kube/tools/minio-recreate-buckets-job.yaml +++ b/hack/kube/tools/minio-recreate-buckets-job.yaml @@ -31,5 +31,7 @@ spec: mc mb enduro/aips --ignore-existing; mc mb enduro/perma-aips-1 --ignore-existing; mc mb enduro/perma-aips-2 --ignore-existing; + mc mb enduro/failed-sips --ignore-existing; + mc mb enduro/failed-pips --ignore-existing; mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing", ] diff --git a/internal/config/config.go b/internal/config/config.go index be8191c2d..555743808 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -49,6 +49,9 @@ type Configuration struct { Upload upload.Config Watcher watcher.Config Telemetry telemetry.Config + + FailedSIPs storage.LocationConfig + FailedPIPs storage.LocationConfig } func (c Configuration) Validate() error { diff --git a/internal/workflow/activities/send_to_failed_bucket.go b/internal/workflow/activities/send_to_failed_bucket.go new file mode 100644 index 000000000..a8ef3e86e --- /dev/null +++ b/internal/workflow/activities/send_to_failed_bucket.go @@ -0,0 +1,70 @@ +package activities + +import ( + "context" + "fmt" + "os" + + "gocloud.dev/blob" +) + +const ( + SendToFailedBucketName = "send-to-failed-bucket" + FailureSIP = "failure-sip" + FailurePIP = "failure-pip" +) + +type SendToFailedBucketParams struct { + Type string + Path string + Key string +} + +type SendToFailedBucketResult struct { + FailedKey string +} + +type SendToFailedBucketActivity struct { + failedSIPs *blob.Bucket + failedPIPs *blob.Bucket +} + +func NewSendToFailedBuckeActivity(failedSIPs, failedPIPs *blob.Bucket) *SendToFailedBucketActivity { + return &SendToFailedBucketActivity{ + failedSIPs: failedSIPs, + failedPIPs: failedPIPs, + } +} + +func (sf *SendToFailedBucketActivity) Execute( + ctx context.Context, + params *SendToFailedBucketParams, +) (*SendToFailedBucketResult, error) { + res := &SendToFailedBucketResult{} + + f, err := os.Open(params.Path) + if err != nil { + return nil, err + } + defer f.Close() + + res.FailedKey = "Failed_" + params.Key + writerOptions := &blob.WriterOptions{ + ContentType: "application/octet-stream", + BufferSize: 100_000_000, + } + + switch params.Type { + case FailureSIP: + err = sf.failedSIPs.Upload(ctx, res.FailedKey, f, writerOptions) + case FailurePIP: + err = sf.failedPIPs.Upload(ctx, res.FailedKey, f, writerOptions) + default: + err = fmt.Errorf("SendToFailedBucketActivity: unexpected type %q", params.Type) + } + if err != nil { + return nil, err + } + + return res, nil +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 14e9f39e2..88dfa7758 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -104,6 +104,9 @@ type TransferInfo struct { // It is populated by the workflow request. GlobalTaskQueue string PreservationTaskQueue string + + SendToFailedPath string + SendToFailedType string } func (t *TransferInfo) Name() string { @@ -320,11 +323,31 @@ func (w *ProcessingWorkflow) SessionHandler( sessCtx temporalsdk_workflow.Context, attempt int, tinfo *TransferInfo, -) error { +) (e error) { var cleanup cleanupRegistry - defer w.sessionCleanup(sessCtx, &cleanup) + defer func() { + if e != nil && tinfo.SendToFailedType != "" && tinfo.SendToFailedPath != "" { + // TODO: make sure tinfo.SendToFailedPath is zipped. + activityOpts := withActivityOptsForLongLivedRequest(sessCtx) + sendErr := temporalsdk_workflow.ExecuteActivity( + activityOpts, + activities.SendToFailedBucketName, + &activities.SendToFailedBucketParams{ + Type: tinfo.SendToFailedType, + Path: tinfo.SendToFailedPath, + Key: tinfo.req.Key, + }, + ).Get(activityOpts, nil) + if sendErr != nil { + e = errors.Join(e, sendErr) + } + } + + w.sessionCleanup(sessCtx, &cleanup) + }() packageStartedAt := temporalsdk_workflow.Now(sessCtx).UTC() + tinfo.SendToFailedType = activities.FailureSIP // Set in-progress status. { @@ -381,6 +404,7 @@ func (w *ProcessingWorkflow) SessionHandler( // Delete download tmp dir when session ends. cleanup.registerPath(filepath.Dir(downloadResult.Path)) + tinfo.SendToFailedPath = downloadResult.Path } // Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow. @@ -438,8 +462,11 @@ func (w *ProcessingWorkflow) SessionHandler( // Delete bundled transfer when session ends. cleanup.registerPath(bundleResult.FullPath) + tinfo.SendToFailedPath = tinfo.Bundle.FullPath } + tinfo.SendToFailedType = activities.FailurePIP + // Do preservation activities. { var err error @@ -763,6 +790,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti if err != nil { return err } + tinfo.SendToFailedPath = zipResult.Path // Upload transfer to AMSS. activityOpts = temporalsdk_workflow.WithActivityOptions(sessCtx,