Skip to content

Commit

Permalink
WIP: Add failed SIPs/PIPs buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
jraddaoui committed Jun 27, 2024
1 parent f3e0f70 commit f07d118
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 2 deletions.
38 changes: 38 additions & 0 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
temporalsdk_interceptor "go.temporal.io/sdk/interceptor"
temporalsdk_worker "go.temporal.io/sdk/worker"
goahttp "goa.design/goa/v3/http"
"gocloud.dev/blob"

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/api/auth"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/persistence"
entclient "github.com/artefactual-sdps/enduro/internal/persistence/ent/client"
entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db"
"github.com/artefactual-sdps/enduro/internal/storage"
"github.com/artefactual-sdps/enduro/internal/telemetry"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/version"
Expand Down Expand Up @@ -183,6 +185,38 @@ func main() {
}
}

// Set-up failed SIPs bucket.
var fs *blob.Bucket
{
fl, err := storage.NewInternalLocation(&cfg.FailedSIPs)
if err != nil {
logger.Error(err, "Error setting up failed SIPs location.")
os.Exit(1)
}
fs, err = fl.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed SIPs bucket.")
os.Exit(1)
}
}
defer fs.Close()

// Set-up failed PIPs bucket.
var fp *blob.Bucket
{
fl, err := storage.NewInternalLocation(&cfg.FailedPIPs)
if err != nil {
logger.Error(err, "Error setting up failed PIPs location.")
os.Exit(1)
}
fp, err = fl.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed PIPs bucket.")
os.Exit(1)
}
}
defer fp.Close()

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -276,6 +310,10 @@ func main() {
activities.NewRejectPackageActivity(storageClient).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.RejectPackageActivityName},
)
w.RegisterActivityWithOptions(
activities.NewSendToFailedBuckeActivity(fs, fp).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName},
)

g.Add(
func() error {
Expand Down
38 changes: 38 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
temporalsdk_interceptor "go.temporal.io/sdk/interceptor"
temporalsdk_worker "go.temporal.io/sdk/worker"
goahttp "goa.design/goa/v3/http"
"gocloud.dev/blob"

"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/api/auth"
Expand All @@ -44,6 +45,7 @@ import (
entclient "github.com/artefactual-sdps/enduro/internal/persistence/ent/client"
entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db"
"github.com/artefactual-sdps/enduro/internal/sftp"
"github.com/artefactual-sdps/enduro/internal/storage"
"github.com/artefactual-sdps/enduro/internal/telemetry"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/version"
Expand Down Expand Up @@ -178,6 +180,38 @@ func main() {
)
}

// Set-up failed SIPs bucket.
var fs *blob.Bucket
{
fl, err := storage.NewInternalLocation(&cfg.FailedSIPs)
if err != nil {
logger.Error(err, "Error setting up failed SIPs location.")
os.Exit(1)
}
fs, err = fl.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed SIPs bucket.")
os.Exit(1)
}
}
defer fs.Close()

// Set-up failed PIPs bucket.
var fp *blob.Bucket
{
fl, err := storage.NewInternalLocation(&cfg.FailedPIPs)
if err != nil {
logger.Error(err, "Error setting up failed PIPs location.")
os.Exit(1)
}
fp, err = fl.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed PIPs bucket.")
os.Exit(1)
}
}
defer fp.Close()

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -271,6 +305,10 @@ func main() {
filesys.NewRemoveActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: filesys.RemoveActivityName},
)
w.RegisterActivityWithOptions(
activities.NewSendToFailedBuckeActivity(fs, fp).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName},
)

g.Add(
func() error {
Expand Down
16 changes: 16 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,19 @@ sharedPath = "/home/enduro/preprocessing"
namespace = "default"
taskQueue = "preprocessing"
workflowName = "preprocessing"

[failedSips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
key = "minio"
secret = "minio123"
region = "us-west-1"
bucket = "failed-sips"

[failedPips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
key = "minio"
secret = "minio123"
region = "us-west-1"
bucket = "failed-pips"
2 changes: 2 additions & 0 deletions hack/kube/base/minio-setup-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ spec:
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/failed-pips --ignore-existing;
mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing",
]
2 changes: 2 additions & 0 deletions hack/kube/tools/minio-recreate-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ spec:
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/failed-pips --ignore-existing;
mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing",
]
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Configuration struct {
Upload upload.Config
Watcher watcher.Config
Telemetry telemetry.Config

FailedSIPs storage.LocationConfig
FailedPIPs storage.LocationConfig
}

func (c Configuration) Validate() error {
Expand Down
70 changes: 70 additions & 0 deletions internal/workflow/activities/send_to_failed_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package activities

import (
"context"
"fmt"
"os"

"gocloud.dev/blob"
)

const (
SendToFailedBucketName = "send-to-failed-bucket"
FailureSIP = "failure-sip"
FailurePIP = "failure-pip"
)

type SendToFailedBucketParams struct {
Type string
Path string
Key string
}

type SendToFailedBucketResult struct {
FailedKey string
}

type SendToFailedBucketActivity struct {
failedSIPs *blob.Bucket
failedPIPs *blob.Bucket
}

func NewSendToFailedBuckeActivity(failedSIPs, failedPIPs *blob.Bucket) *SendToFailedBucketActivity {
return &SendToFailedBucketActivity{
failedSIPs: failedSIPs,
failedPIPs: failedPIPs,
}
}

func (sf *SendToFailedBucketActivity) Execute(
ctx context.Context,
params *SendToFailedBucketParams,
) (*SendToFailedBucketResult, error) {
res := &SendToFailedBucketResult{}

f, err := os.Open(params.Path)
if err != nil {
return nil, err
}
defer f.Close()

res.FailedKey = "Failed_" + params.Key
writerOptions := &blob.WriterOptions{
ContentType: "application/octet-stream",
BufferSize: 100_000_000,
}

switch params.Type {
case FailureSIP:
err = sf.failedSIPs.Upload(ctx, res.FailedKey, f, writerOptions)
case FailurePIP:
err = sf.failedPIPs.Upload(ctx, res.FailedKey, f, writerOptions)
default:
err = fmt.Errorf("SendToFailedBucketActivity: unexpected type %q", params.Type)
}
if err != nil {
return nil, err
}

return res, nil
}
32 changes: 30 additions & 2 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type TransferInfo struct {
// It is populated by the workflow request.
GlobalTaskQueue string
PreservationTaskQueue string

SendToFailedPath string
SendToFailedType string
}

func (t *TransferInfo) Name() string {
Expand Down Expand Up @@ -320,11 +323,31 @@ func (w *ProcessingWorkflow) SessionHandler(
sessCtx temporalsdk_workflow.Context,
attempt int,
tinfo *TransferInfo,
) error {
) (e error) {
var cleanup cleanupRegistry
defer w.sessionCleanup(sessCtx, &cleanup)
defer func() {
if e != nil && tinfo.SendToFailedType != "" && tinfo.SendToFailedPath != "" {
// TODO: make sure tinfo.SendToFailedPath is zipped.
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
sendErr := temporalsdk_workflow.ExecuteActivity(
activityOpts,
activities.SendToFailedBucketName,
&activities.SendToFailedBucketParams{
Type: tinfo.SendToFailedType,
Path: tinfo.SendToFailedPath,
Key: tinfo.req.Key,
},
).Get(activityOpts, nil)
if sendErr != nil {
e = errors.Join(e, sendErr)
}
}

w.sessionCleanup(sessCtx, &cleanup)
}()

packageStartedAt := temporalsdk_workflow.Now(sessCtx).UTC()
tinfo.SendToFailedType = activities.FailureSIP

// Set in-progress status.
{
Expand Down Expand Up @@ -381,6 +404,7 @@ func (w *ProcessingWorkflow) SessionHandler(

// Delete download tmp dir when session ends.
cleanup.registerPath(filepath.Dir(downloadResult.Path))
tinfo.SendToFailedPath = downloadResult.Path
}

// Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow.
Expand Down Expand Up @@ -438,8 +462,11 @@ func (w *ProcessingWorkflow) SessionHandler(

// Delete bundled transfer when session ends.
cleanup.registerPath(bundleResult.FullPath)
tinfo.SendToFailedPath = tinfo.Bundle.FullPath
}

tinfo.SendToFailedType = activities.FailurePIP

// Do preservation activities.
{
var err error
Expand Down Expand Up @@ -763,6 +790,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti
if err != nil {
return err
}
tinfo.SendToFailedPath = zipResult.Path

// Upload transfer to AMSS.
activityOpts = temporalsdk_workflow.WithActivityOptions(sessCtx,
Expand Down

0 comments on commit f07d118

Please sign in to comment.