Skip to content

Commit

Permalink
WIP: Use storage service directly
Browse files Browse the repository at this point in the history
Use the storage service instead of the client to create the package
there after it's stored in AMSS.
  • Loading branch information
jraddaoui committed Jul 23, 2024
1 parent acce1c3 commit 5d1cd0a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 217 deletions.
95 changes: 52 additions & 43 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"crypto/rand"
"fmt"
"net/http"
"net/http/httptrace"
Expand All @@ -24,19 +25,15 @@ import (
"go.artefactual.dev/tools/log"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/trace"
temporalsdk_activity "go.temporal.io/sdk/activity"
temporalsdk_client "go.temporal.io/sdk/client"
temporalsdk_contrib_opentelemetry "go.temporal.io/sdk/contrib/opentelemetry"
temporalsdk_interceptor "go.temporal.io/sdk/interceptor"
temporalsdk_worker "go.temporal.io/sdk/worker"
goahttp "goa.design/goa/v3/http"
"gocloud.dev/blob"

"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/api/auth"
goahttpstorage "github.com/artefactual-sdps/enduro/internal/api/gen/http/storage/client"
goastorage "github.com/artefactual-sdps/enduro/internal/api/gen/storage"
"github.com/artefactual-sdps/enduro/internal/config"
"github.com/artefactual-sdps/enduro/internal/db"
"github.com/artefactual-sdps/enduro/internal/event"
Expand All @@ -46,6 +43,9 @@ import (
entdb "github.com/artefactual-sdps/enduro/internal/persistence/ent/db"
"github.com/artefactual-sdps/enduro/internal/sftp"
"github.com/artefactual-sdps/enduro/internal/storage"
storage_persistence "github.com/artefactual-sdps/enduro/internal/storage/persistence"
storage_entclient "github.com/artefactual-sdps/enduro/internal/storage/persistence/ent/client"
storage_entdb "github.com/artefactual-sdps/enduro/internal/storage/persistence/ent/db"
"github.com/artefactual-sdps/enduro/internal/telemetry"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/version"
Expand Down Expand Up @@ -212,6 +212,53 @@ func main() {
}
defer fp.Close()

Check warning on line 213 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L213

Added line #L213 was not covered by tests

// Set up the Storage database client handler.
storageDatabase, err := db.Connect(ctx, tp, cfg.Storage.Database.Driver, cfg.Storage.Database.DSN)
if err != nil {
logger.Error(err, "Storage database configuration failed.")
os.Exit(1)

Check warning on line 219 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L216-L219

Added lines #L216 - L219 were not covered by tests
}
if cfg.Storage.Database.Migrate {
if err := db.MigrateEnduroStorageDatabase(storageDatabase); err != nil {
logger.Error(err, "Storage database migration failed.")
os.Exit(1)

Check warning on line 224 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L221-L224

Added lines #L221 - L224 were not covered by tests
}
}

// Set up the storage persistence layer.
var storagePersistence storage_persistence.Storage

Check warning on line 229 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L229

Added line #L229 was not covered by tests
{
drv := sqlcomment.NewDriver(
sql.OpenDB(cfg.Storage.Database.Driver, storageDatabase),
sqlcomment.WithDriverVerTag(),
sqlcomment.WithTags(sqlcomment.Tags{
sqlcomment.KeyApplication: appName,
}),
)
client := storage_entdb.NewClient(storage_entdb.Driver(drv))
storagePersistence = storage_persistence.WithTelemetry(
storage_entclient.NewClient(client),
tp.Tracer("storage/persistence"),
)

Check warning on line 242 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L231-L242

Added lines #L231 - L242 were not covered by tests
}

// Set up the storage service.
var storagesvc storage.Service

Check warning on line 246 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L246

Added line #L246 was not covered by tests
{
storagesvc, err = storage.NewService(
logger.WithName("storage"),
cfg.Storage,
storagePersistence,
nil,
&auth.NoopTokenVerifier{},
rand.Reader,
)
if err != nil {
logger.Error(err, "Error setting up storage service.")
os.Exit(1)

Check warning on line 258 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L248-L258

Added lines #L248 - L258 were not covered by tests
}
}

var g run.Group

// Activity worker.
Expand Down Expand Up @@ -296,9 +343,8 @@ func main() {
temporalsdk_activity.RegisterOptions{Name: am.PollIngestActivityName},
)

storageClient := newStorageClient(tp, cfg)
w.RegisterActivityWithOptions(
activities.NewCreateStoragePackageActivity(storageClient).Execute,
activities.NewCreateStoragePackageActivity(storagesvc).Execute,

Check warning on line 347 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L347

Added line #L347 was not covered by tests
temporalsdk_activity.RegisterOptions{Name: activities.CreateStoragePackageActivityName},
)
w.RegisterActivityWithOptions(
Expand Down Expand Up @@ -401,40 +447,3 @@ func main() {
}
logger.Info("Bye!")
}

func newStorageClient(tp trace.TracerProvider, cfg config.Configuration) *goastorage.Client {
httpClient := cleanhttp.DefaultPooledClient()
httpClient.Transport = otelhttp.NewTransport(
httpClient.Transport,
otelhttp.WithTracerProvider(tp),
otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace {
return otelhttptrace.NewClientTrace(ctx)
}),
)

storageHttpClient := goahttpstorage.NewClient(
"http",
cfg.Storage.EnduroAddress,
httpClient,
goahttp.RequestEncoder,
goahttp.ResponseDecoder,
false,
)

storageClient := goastorage.NewClient(
storageHttpClient.Create(),
storageHttpClient.Submit(),
storageHttpClient.Update(),
storageHttpClient.Download(),
storageHttpClient.Move(),
storageHttpClient.MoveStatus(),
storageHttpClient.Reject(),
storageHttpClient.Show(),
storageHttpClient.Locations(),
storageHttpClient.AddLocation(),
storageHttpClient.ShowLocation(),
storageHttpClient.LocationPackages(),
)

return storageClient
}
24 changes: 4 additions & 20 deletions internal/workflow/activities/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ package activities

import (
"context"
"errors"
"fmt"
"time"

"github.com/google/uuid"
"github.com/oklog/run"
"go.artefactual.dev/tools/temporal"
temporalsdk_activity "go.temporal.io/sdk/activity"
goa "goa.design/goa/v3/pkg"

goastorage "github.com/artefactual-sdps/enduro/internal/api/gen/storage"
"github.com/artefactual-sdps/enduro/internal/storage"
)

type CreateStoragePackageActivity struct {
client storage.Client
svc storage.Service
}

type CreateStoragePackageActivityParams struct {
Expand All @@ -32,8 +30,8 @@ type CreateStoragePackageActivityResult struct {
CreatedAt string
}

func NewCreateStoragePackageActivity(client storage.Client) *CreateStoragePackageActivity {
return &CreateStoragePackageActivity{client: client}
func NewCreateStoragePackageActivity(svc storage.Service) *CreateStoragePackageActivity {
return &CreateStoragePackageActivity{svc: svc}

Check warning on line 34 in internal/workflow/activities/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/storage.go#L33-L34

Added lines #L33 - L34 were not covered by tests
}

func (a *CreateStoragePackageActivity) Execute(
Expand All @@ -51,22 +49,8 @@ func (a *CreateStoragePackageActivity) Execute(
LocationID: params.LocationID,
}

pkg, err := a.client.Create(ctx, &payload)
pkg, err := a.svc.Create(ctx, &payload)

Check warning on line 52 in internal/workflow/activities/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/storage.go#L52

Added line #L52 was not covered by tests
if err != nil {
if errors.Is(err, goastorage.Unauthorized("Unauthorized")) {
return nil, temporal.NewNonRetryableError(
fmt.Errorf("%s: %v", CreateStoragePackageActivityName, err),
)
}

if serr, ok := err.(*goa.ServiceError); ok {
if serr.Name == "not_valid" {
return nil, temporal.NewNonRetryableError(
fmt.Errorf("%s: %v", CreateStoragePackageActivityName, err),
)
}
}

return nil, fmt.Errorf("%s: %v", CreateStoragePackageActivityName, err)
}

Expand Down
154 changes: 0 additions & 154 deletions internal/workflow/activities/storage_test.go

This file was deleted.

0 comments on commit 5d1cd0a

Please sign in to comment.