Skip to content

Commit

Permalink
Add failed SIPs/PIPs buckets
Browse files Browse the repository at this point in the history
If there is an error before the SIP is sent to preservation, copy the
downloaded SIP file to a failed SIPs bucket. If there is an error on
preservation, send a copy of the PIP to a failed PIPs bucket, with all
the transformations made before it was sent to preservation.

- Add failed buckets configuration.
- Use temporal-activities/bucketupload registered with two different
  names for each failed bucket.
- Modify Kubernetes jobs to set up and re-create buckets.

[skip-codecov]
  • Loading branch information
jraddaoui committed Oct 17, 2024
1 parent 29f7053 commit dd678d1
Show file tree
Hide file tree
Showing 9 changed files with 544 additions and 6 deletions.
26 changes: 26 additions & 0 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (
bagit_gython "github.com/artefactual-labs/bagit-gython"
"github.com/artefactual-sdps/temporal-activities/archiveextract"
"github.com/artefactual-sdps/temporal-activities/bagvalidate"
"github.com/artefactual-sdps/temporal-activities/bucketupload"
"github.com/artefactual-sdps/temporal-activities/removepaths"
"github.com/hashicorp/go-cleanhttp"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"go.artefactual.dev/tools/bucket"
"go.artefactual.dev/tools/log"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
Expand Down Expand Up @@ -203,6 +205,22 @@ func main() {
}
}()

// Set-up failed SIPs bucket.
failedSIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedSIPs)
if err != nil {
logger.Error(err, "Error setting up failed SIPs bucket.")
os.Exit(1)
}
defer failedSIPs.Close()

// Set-up failed PIPs bucket.
failedPIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedPIPs)
if err != nil {
logger.Error(err, "Error setting up failed PIPs bucket.")
os.Exit(1)
}
defer failedPIPs.Close()

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -306,6 +324,14 @@ func main() {
activities.NewRejectPackageActivity(storageClient).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.RejectPackageActivityName},
)
w.RegisterActivityWithOptions(
bucketupload.New(failedSIPs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedSIPsName},
)
w.RegisterActivityWithOptions(
bucketupload.New(failedPIPs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName},
)

g.Add(
func() error {
Expand Down
26 changes: 26 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"github.com/artefactual-sdps/temporal-activities/archivezip"
"github.com/artefactual-sdps/temporal-activities/bagcreate"
"github.com/artefactual-sdps/temporal-activities/bagvalidate"
"github.com/artefactual-sdps/temporal-activities/bucketupload"
"github.com/artefactual-sdps/temporal-activities/removepaths"
"github.com/hashicorp/go-cleanhttp"
"github.com/jonboulle/clockwork"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"go.artefactual.dev/amclient"
"go.artefactual.dev/tools/bucket"
"go.artefactual.dev/tools/log"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
Expand Down Expand Up @@ -200,6 +202,22 @@ func main() {
}
}()

// Set-up failed SIPs bucket.
failedSIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedSIPs)
if err != nil {
logger.Error(err, "Error setting up failed SIPs bucket.")
os.Exit(1)
}
defer failedSIPs.Close()

// Set-up failed PIPs bucket.
failedPIPs, err := bucket.NewWithConfig(ctx, &cfg.FailedPIPs)
if err != nil {
logger.Error(err, "Error setting up failed PIPs bucket.")
os.Exit(1)
}
defer failedPIPs.Close()

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -303,6 +321,14 @@ func main() {
removepaths.New().Execute,
temporalsdk_activity.RegisterOptions{Name: removepaths.Name},
)
w.RegisterActivityWithOptions(
bucketupload.New(failedSIPs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedSIPsName},
)
w.RegisterActivityWithOptions(
bucketupload.New(failedPIPs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName},
)

g.Add(
func() error {
Expand Down
16 changes: 16 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,19 @@ sharedPath = "/home/enduro/preprocessing"
namespace = "default"
taskQueue = "preprocessing"
workflowName = "preprocessing"

[failedSips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
accessKey = "minio"
secretKey = "minio123"
region = "us-west-1"
bucket = "failed-sips"

[failedPips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
accessKey = "minio"
secretKey = "minio123"
region = "us-west-1"
bucket = "failed-pips"
2 changes: 2 additions & 0 deletions hack/kube/base/minio-setup-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ spec:
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/failed-pips --ignore-existing;
mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing",
]
2 changes: 2 additions & 0 deletions hack/kube/tools/minio-recreate-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ spec:
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/failed-pips --ignore-existing;
mc event add enduro/sips arn:minio:sqs::PRIMARY:redis --event put --ignore-existing",
]
4 changes: 4 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"go.artefactual.dev/tools/bucket"

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/am"
Expand Down Expand Up @@ -53,6 +54,9 @@ type Configuration struct {
Upload package_.UploadConfig
Watcher watcher.Config
Telemetry telemetry.Config

FailedSIPs bucket.Config
FailedPIPs bucket.Config
}

func (c *Configuration) Validate() error {
Expand Down
2 changes: 2 additions & 0 deletions internal/workflow/activities/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ const (
PollMoveToPermanentStorageActivityName = "poll-move-to-permanent-storage-activity"
RejectPackageActivityName = "reject-package-activity"
UploadActivityName = "upload-activity"
SendToFailedSIPsName = "send-to-failed-sips"
SendToFailedPIPsName = "send-to-failed-pips"
)
56 changes: 54 additions & 2 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/artefactual-sdps/temporal-activities/archivezip"
"github.com/artefactual-sdps/temporal-activities/bagcreate"
"github.com/artefactual-sdps/temporal-activities/bagvalidate"
"github.com/artefactual-sdps/temporal-activities/bucketupload"
"github.com/artefactual-sdps/temporal-activities/removepaths"
"github.com/google/uuid"
"go.artefactual.dev/tools/ref"
Expand Down Expand Up @@ -110,6 +111,13 @@ type TransferInfo struct {
// It is populated by the workflow request.
GlobalTaskQueue string
PreservationTaskQueue string

// Send to failed variables used to keep track of the SIP
// location, if it requires zipping (a3m PIP) and what activity
// needs to be called to be uploaded to the expected bucket.
SendToFailedPath string
SendToFailedActivityName string
SendToFailedNeedsZipping bool
}

func (t *TransferInfo) Name() string {
Expand Down Expand Up @@ -327,9 +335,43 @@ func (w *ProcessingWorkflow) SessionHandler(
sessCtx temporalsdk_workflow.Context,
attempt int,
tinfo *TransferInfo,
) error {
) (e error) {
var cleanup cleanupRegistry
defer w.sessionCleanup(sessCtx, &cleanup)
defer func() {
if e != nil && tinfo.SendToFailedActivityName != "" && tinfo.SendToFailedPath != "" {
if tinfo.SendToFailedNeedsZipping {
var zipResult archivezip.Result
activityOpts := withActivityOptsForLocalAction(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
archivezip.Name,
&archivezip.Params{SourceDir: tinfo.SendToFailedPath},
).Get(activityOpts, &zipResult)
if err != nil {
e = errors.Join(e, err)
return
}
tinfo.SendToFailedPath = zipResult.Path
}

var sendToFailedResult bucketupload.Result
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
tinfo.SendToFailedActivityName,
&bucketupload.Params{
Path: tinfo.SendToFailedPath,
Key: fmt.Sprintf("Failed_%s", tinfo.req.Key),
BufferSize: 100_000_000,
},
).Get(activityOpts, &sendToFailedResult)
if err != nil {
e = errors.Join(e, err)
}
}

w.sessionCleanup(sessCtx, &cleanup)
}()

packageStartedAt := temporalsdk_workflow.Now(sessCtx).UTC()

Expand Down Expand Up @@ -388,6 +430,9 @@ func (w *ProcessingWorkflow) SessionHandler(

// Delete download tmp dir when session ends.
cleanup.registerPath(filepath.Dir(downloadResult.Path))

tinfo.SendToFailedActivityName = activities.SendToFailedSIPsName
tinfo.SendToFailedPath = downloadResult.Path
}

// Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow.
Expand Down Expand Up @@ -808,6 +853,10 @@ func (w *ProcessingWorkflow) transferA3m(
cleanup.registerPath(bundleResult.FullPath)
}

tinfo.SendToFailedPath = tinfo.Bundle.FullPath
tinfo.SendToFailedActivityName = activities.SendToFailedPIPsName
tinfo.SendToFailedNeedsZipping = true

// Send PIP to a3m for preservation.
{
activityOpts := temporalsdk_workflow.WithActivityOptions(sessCtx, temporalsdk_workflow.ActivityOptions{
Expand Down Expand Up @@ -869,6 +918,9 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo
return err
}

tinfo.SendToFailedPath = zipResult.Path
tinfo.SendToFailedActivityName = activities.SendToFailedPIPsName

// Upload PIP to AMSS.
activityOpts = temporalsdk_workflow.WithActivityOptions(ctx,
temporalsdk_workflow.ActivityOptions{
Expand Down
Loading

0 comments on commit dd678d1

Please sign in to comment.