Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failed SIPs/PIPs buckets #930

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (
"entgo.io/ent/dialect/sql"
bagit_gython "github.com/artefactual-labs/bagit-gython"
"github.com/artefactual-sdps/temporal-activities/archiveextract"
"github.com/artefactual-sdps/temporal-activities/archivezip"
"github.com/artefactual-sdps/temporal-activities/bagvalidate"
"github.com/artefactual-sdps/temporal-activities/bucketupload"
"github.com/artefactual-sdps/temporal-activities/removepaths"
"github.com/hashicorp/go-cleanhttp"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"go.artefactual.dev/tools/bucket"
"go.artefactual.dev/tools/log"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
Expand Down Expand Up @@ -203,6 +206,22 @@ func main() {
}
}()

// Set-up failed SIPs bucket.
failedSIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedSIPs)
if err != nil {
logger.Error(err, "Error setting up failed SIPs bucket.")
os.Exit(1)
}
defer failedSIPs.Close()

// Set-up failed PIPs bucket.
failedPIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedPIPs)
if err != nil {
logger.Error(err, "Error setting up failed PIPs bucket.")
os.Exit(1)
}
defer failedPIPs.Close()

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -306,6 +325,18 @@ func main() {
activities.NewRejectPackageActivity(storageClient).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.RejectPackageActivityName},
)
w.RegisterActivityWithOptions(
archivezip.New().Execute,
temporalsdk_activity.RegisterOptions{Name: archivezip.Name},
)
w.RegisterActivityWithOptions(
bucketupload.New(failedSIPs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedSIPsName},
)
w.RegisterActivityWithOptions(
bucketupload.New(failedPIPs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName},
)

g.Add(
func() error {
Expand Down
26 changes: 26 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"github.com/artefactual-sdps/temporal-activities/archivezip"
"github.com/artefactual-sdps/temporal-activities/bagcreate"
"github.com/artefactual-sdps/temporal-activities/bagvalidate"
"github.com/artefactual-sdps/temporal-activities/bucketupload"
"github.com/artefactual-sdps/temporal-activities/removepaths"
"github.com/hashicorp/go-cleanhttp"
"github.com/jonboulle/clockwork"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"go.artefactual.dev/amclient"
"go.artefactual.dev/tools/bucket"
"go.artefactual.dev/tools/log"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
Expand Down Expand Up @@ -200,6 +202,22 @@ func main() {
}
}()

// Set-up failed SIPs bucket.
failedSIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedSIPs)
if err != nil {
logger.Error(err, "Error setting up failed SIPs bucket.")
os.Exit(1)
}
defer failedSIPs.Close()

// Set-up failed PIPs bucket.
failedPIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedPIPs)
if err != nil {
logger.Error(err, "Error setting up failed PIPs bucket.")
os.Exit(1)
}
defer failedPIPs.Close()

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -303,6 +321,14 @@ func main() {
removepaths.New().Execute,
temporalsdk_activity.RegisterOptions{Name: removepaths.Name},
)
w.RegisterActivityWithOptions(
bucketupload.New(failedSIPs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedSIPsName},
)
w.RegisterActivityWithOptions(
bucketupload.New(failedPIPs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName},
)

g.Add(
func() error {
Expand Down
16 changes: 16 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,19 @@ sharedPath = "/home/enduro/preprocessing"
namespace = "default"
taskQueue = "preprocessing"
workflowName = "preprocessing"

[failedSips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
accessKey = "minio"
secretKey = "minio123"
region = "us-west-1"
bucket = "failed-sips"

[failedPips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
accessKey = "minio"
secretKey = "minio123"
region = "us-west-1"
bucket = "failed-pips"
2 changes: 2 additions & 0 deletions hack/kube/base/minio-setup-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ spec:
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/failed-pips --ignore-existing;
mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing",
]
2 changes: 2 additions & 0 deletions hack/kube/tools/minio-recreate-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ spec:
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/failed-pips --ignore-existing;
mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing",
]
4 changes: 4 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"go.artefactual.dev/tools/bucket"

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/am"
Expand Down Expand Up @@ -53,6 +54,9 @@ type Configuration struct {
Upload package_.UploadConfig
Watcher watcher.Config
Telemetry telemetry.Config

FailedSIPs bucket.Config
FailedPIPs bucket.Config
}

func (c *Configuration) Validate() error {
Expand Down
2 changes: 2 additions & 0 deletions internal/workflow/activities/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ const (
PollMoveToPermanentStorageActivityName = "poll-move-to-permanent-storage-activity"
RejectPackageActivityName = "reject-package-activity"
UploadActivityName = "upload-activity"
SendToFailedSIPsName = "send-to-failed-sips"
SendToFailedPIPsName = "send-to-failed-pips"
)
74 changes: 72 additions & 2 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/artefactual-sdps/temporal-activities/archivezip"
"github.com/artefactual-sdps/temporal-activities/bagcreate"
"github.com/artefactual-sdps/temporal-activities/bagvalidate"
"github.com/artefactual-sdps/temporal-activities/bucketupload"
"github.com/artefactual-sdps/temporal-activities/removepaths"
"github.com/google/uuid"
"go.artefactual.dev/tools/ref"
Expand Down Expand Up @@ -110,6 +111,18 @@ type TransferInfo struct {
// It is populated by the workflow request.
GlobalTaskQueue string
PreservationTaskQueue string

// Send to failed information.
SendToFailed SendToFailed
}

// Send to failed variables used to keep track of the SIP/PIP
// location, if it requires zipping (a3m PIP) and what activity
// needs to be called to be uploaded to the expected bucket.
type SendToFailed struct {
Path string
ActivityName string
NeedsZipping bool
}

func (t *TransferInfo) Name() string {
Expand Down Expand Up @@ -327,9 +340,15 @@ func (w *ProcessingWorkflow) SessionHandler(
sessCtx temporalsdk_workflow.Context,
attempt int,
tinfo *TransferInfo,
) error {
) (e error) {
var cleanup cleanupRegistry
defer w.sessionCleanup(sessCtx, &cleanup)
defer func() {
if e != nil {
e = errors.Join(e, w.sendToFailedBucket(sessCtx, tinfo.SendToFailed, tinfo.req.Key))
}

w.sessionCleanup(sessCtx, &cleanup)
}()

packageStartedAt := temporalsdk_workflow.Now(sessCtx).UTC()

Expand Down Expand Up @@ -388,6 +407,9 @@ func (w *ProcessingWorkflow) SessionHandler(

// Delete download tmp dir when session ends.
cleanup.registerPath(filepath.Dir(downloadResult.Path))

tinfo.SendToFailed.Path = downloadResult.Path
tinfo.SendToFailed.ActivityName = activities.SendToFailedSIPsName
}

// Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow.
Expand Down Expand Up @@ -808,6 +830,10 @@ func (w *ProcessingWorkflow) transferA3m(
cleanup.registerPath(bundleResult.FullPath)
}

tinfo.SendToFailed.Path = tinfo.Bundle.FullPath
tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName
tinfo.SendToFailed.NeedsZipping = true

// Send PIP to a3m for preservation.
{
activityOpts := temporalsdk_workflow.WithActivityOptions(sessCtx, temporalsdk_workflow.ActivityOptions{
Expand Down Expand Up @@ -869,6 +895,9 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo
return err
}

tinfo.SendToFailed.Path = zipResult.Path
tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName

// Upload PIP to AMSS.
activityOpts = temporalsdk_workflow.WithActivityOptions(ctx,
temporalsdk_workflow.ActivityOptions{
Expand Down Expand Up @@ -1118,3 +1147,44 @@ func (w *ProcessingWorkflow) completePreservationTask(

return nil
}

func (w *ProcessingWorkflow) sendToFailedBucket(
sessCtx temporalsdk_workflow.Context,
stf SendToFailed,
key string,
) error {
if stf.Path == "" || stf.ActivityName == "" {
return nil
}

if stf.NeedsZipping {
var zipResult archivezip.Result
activityOpts := withActivityOptsForLocalAction(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
archivezip.Name,
&archivezip.Params{SourceDir: stf.Path},
).Get(activityOpts, &zipResult)
if err != nil {
return err
}
stf.Path = zipResult.Path
}

var sendToFailedResult bucketupload.Result
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
stf.ActivityName,
&bucketupload.Params{
Path: stf.Path,
Key: fmt.Sprintf("Failed_%s", key),
BufferSize: 100_000_000,
},
).Get(activityOpts, &sendToFailedResult)
if err != nil {
return err
}

return nil
}
Loading
Loading