Skip to content

Commit

Permalink
WIP: Add failed transfers/SIPs buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
jraddaoui committed Apr 25, 2024
1 parent db1a339 commit fd19cff
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 101 deletions.
38 changes: 38 additions & 0 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
temporalsdk_interceptor "go.temporal.io/sdk/interceptor"
temporalsdk_worker "go.temporal.io/sdk/worker"
goahttp "goa.design/goa/v3/http"
"gocloud.dev/blob"

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/api/auth"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/persistence"
entclient "github.com/artefactual-sdps/enduro/internal/persistence/ent/client"
entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db"
"github.com/artefactual-sdps/enduro/internal/storage"
"github.com/artefactual-sdps/enduro/internal/telemetry"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/version"
Expand Down Expand Up @@ -181,6 +183,38 @@ func main() {
}
}

// Set-up failed transfers bucket.
var ft *blob.Bucket

Check warning on line 187 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L187

Added line #L187 was not covered by tests
{
fl, err := storage.NewInternalLocation(&cfg.FailedTransfers)
if err != nil {
logger.Error(err, "Error setting up failed transfers location.")
os.Exit(1)

Check warning on line 192 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L189-L192

Added lines #L189 - L192 were not covered by tests
}
ft, err = fl.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed transfers bucket.")
os.Exit(1)

Check warning on line 197 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L194-L197

Added lines #L194 - L197 were not covered by tests
}
}
defer ft.Close()

Check warning on line 200 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L200

Added line #L200 was not covered by tests

// Set-up failed SIPs bucket.
var fs *blob.Bucket

Check warning on line 203 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L203

Added line #L203 was not covered by tests
{
fl, err := storage.NewInternalLocation(&cfg.FailedSIPs)
if err != nil {
logger.Error(err, "Error setting up failed SIPs location.")
os.Exit(1)

Check warning on line 208 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L205-L208

Added lines #L205 - L208 were not covered by tests
}
fs, err = fl.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed SIPs bucket.")
os.Exit(1)

Check warning on line 213 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L210-L213

Added lines #L210 - L213 were not covered by tests
}
}
defer fs.Close()

Check warning on line 216 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L216

Added line #L216 was not covered by tests

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -273,6 +307,10 @@ func main() {
activities.NewRejectPackageActivity(storageClient).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.RejectPackageActivityName},
)
w.RegisterActivityWithOptions(
activities.NewSendToFailedBuckeActivity(ft, fs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName},
)

Check warning on line 313 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L310-L313

Added lines #L310 - L313 were not covered by tests

g.Add(
func() error {
Expand Down
38 changes: 38 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
temporalsdk_contrib_opentelemetry "go.temporal.io/sdk/contrib/opentelemetry"
temporalsdk_interceptor "go.temporal.io/sdk/interceptor"
temporalsdk_worker "go.temporal.io/sdk/worker"
"gocloud.dev/blob"

"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/api/auth"
Expand All @@ -38,6 +39,7 @@ import (
entclient "github.com/artefactual-sdps/enduro/internal/persistence/ent/client"
entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db"
"github.com/artefactual-sdps/enduro/internal/sftp"
"github.com/artefactual-sdps/enduro/internal/storage"
"github.com/artefactual-sdps/enduro/internal/telemetry"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/version"
Expand Down Expand Up @@ -172,6 +174,38 @@ func main() {
)
}

// Set-up failed transfers bucket.
var ft *blob.Bucket

Check warning on line 178 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L178

Added line #L178 was not covered by tests
{
fl, err := storage.NewInternalLocation(&cfg.FailedTransfers)
if err != nil {
logger.Error(err, "Error setting up failed transfers location.")
os.Exit(1)

Check warning on line 183 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L180-L183

Added lines #L180 - L183 were not covered by tests
}
ft, err = fl.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed transfers bucket.")
os.Exit(1)

Check warning on line 188 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L185-L188

Added lines #L185 - L188 were not covered by tests
}
}
defer ft.Close()

Check warning on line 191 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L191

Added line #L191 was not covered by tests

// Set-up failed SIPs bucket.
var fs *blob.Bucket

Check warning on line 194 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L194

Added line #L194 was not covered by tests
{
fl, err := storage.NewInternalLocation(&cfg.FailedSIPs)
if err != nil {
logger.Error(err, "Error setting up failed SIPs location.")
os.Exit(1)

Check warning on line 199 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L196-L199

Added lines #L196 - L199 were not covered by tests
}
fs, err = fl.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed SIPs bucket.")
os.Exit(1)

Check warning on line 204 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L201-L204

Added lines #L201 - L204 were not covered by tests
}
}
defer fs.Close()

Check warning on line 207 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L207

Added line #L207 was not covered by tests

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -259,6 +293,10 @@ func main() {
activities.NewCleanUpActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName},
)
w.RegisterActivityWithOptions(
activities.NewSendToFailedBuckeActivity(ft, fs).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName},
)

Check warning on line 299 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L296-L299

Added lines #L296 - L299 were not covered by tests

g.Add(
func() error {
Expand Down
16 changes: 16 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,19 @@ sharedPath = "/home/enduro/preprocessing"
namespace = "default"
taskQueue = "preprocessing"
workflowName = "preprocessing"

[failedtransfers]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
key = "minio"
secret = "minio123"
region = "us-west-1"
bucket = "failed-transfers"

[failedsips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
key = "minio"
secret = "minio123"
region = "us-west-1"
bucket = "failed-sips"
2 changes: 2 additions & 0 deletions hack/kube/base/minio-setup-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ spec:
"-c",
"mc alias set enduro http://minio.enduro-sdps:9000 ${MINIO_USER} ${MINIO_PASSWORD};
mc mb enduro/sips --ignore-existing;
mc mb enduro/failed-transfers --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
Expand Down
2 changes: 2 additions & 0 deletions hack/kube/tools/minio-recreate-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ spec:
"mc alias set enduro http://minio.enduro-sdps:9000 ${MINIO_USER} ${MINIO_PASSWORD};
mc rb --force --dangerous enduro;
mc mb enduro/sips --ignore-existing;
mc mb enduro/failed-transfers --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Configuration struct {
Upload upload.Config
Watcher watcher.Config
Telemetry telemetry.Config

FailedTransfers storage.LocationConfig
FailedSIPs storage.LocationConfig
}

func (c Configuration) Validate() error {
Expand Down
67 changes: 67 additions & 0 deletions internal/workflow/activities/send_to_failed_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package activities

import (
"context"
"os"

"gocloud.dev/blob"
)

const (
SendToFailedBucketName = "send-to-failed-bucket"
FailureSIP = "sip"
FailureTransfer = "transfer"
)

type SendToFailedBucketActivity struct {
failedTransferBucket *blob.Bucket
failedSipBucket *blob.Bucket
}

func NewSendToFailedBuckeActivity(transfer, sip *blob.Bucket) *SendToFailedBucketActivity {
return &SendToFailedBucketActivity{
failedTransferBucket: transfer,
failedSipBucket: sip,

Check warning on line 24 in internal/workflow/activities/send_to_failed_bucket.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/send_to_failed_bucket.go#L21-L24

Added lines #L21 - L24 were not covered by tests
}
}

type SendToFailedBucketParams struct {
Type string
Path string
Key string
}

type SendToFailedBucketResult struct {
FailedKey string
}

func (sf *SendToFailedBucketActivity) Execute(
ctx context.Context,
params *SendToFailedBucketParams,
) (*SendToFailedBucketResult, error) {
res := &SendToFailedBucketResult{}

Check warning on line 42 in internal/workflow/activities/send_to_failed_bucket.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/send_to_failed_bucket.go#L41-L42

Added lines #L41 - L42 were not covered by tests

f, err := os.Open(params.Path)
if err != nil {
return nil, err

Check warning on line 46 in internal/workflow/activities/send_to_failed_bucket.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/send_to_failed_bucket.go#L44-L46

Added lines #L44 - L46 were not covered by tests
}
defer f.Close()

Check warning on line 48 in internal/workflow/activities/send_to_failed_bucket.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/send_to_failed_bucket.go#L48

Added line #L48 was not covered by tests

res.FailedKey = "Failed_" + params.Key
writerOptions := &blob.WriterOptions{
ContentType: "application/octet-stream",
BufferSize: 100_000_000,

Check warning on line 53 in internal/workflow/activities/send_to_failed_bucket.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/send_to_failed_bucket.go#L50-L53

Added lines #L50 - L53 were not covered by tests
}

switch params.Type {
case FailureTransfer:
err = sf.failedTransferBucket.Upload(ctx, res.FailedKey, f, writerOptions)
case FailureSIP:
err = sf.failedSipBucket.Upload(ctx, res.FailedKey, f, writerOptions)

Check warning on line 60 in internal/workflow/activities/send_to_failed_bucket.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/send_to_failed_bucket.go#L56-L60

Added lines #L56 - L60 were not covered by tests
}
if err != nil {
return nil, err

Check warning on line 63 in internal/workflow/activities/send_to_failed_bucket.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/send_to_failed_bucket.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}

return res, nil

Check warning on line 66 in internal/workflow/activities/send_to_failed_bucket.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/send_to_failed_bucket.go#L66

Added line #L66 was not covered by tests
}
Loading

0 comments on commit fd19cff

Please sign in to comment.